30 Jul 2016

feedPlanet Python

Podcast.__init__: Episode 68 - Test Engineering with Cris Medina

Summary

We all know that testing is an important part of software and systems development. The problem is that as our systems and applications grow, the amount of testing necessary increases at an exponential rate. Cris Medina joins us this week to talk about some of the problems and approaches associated with testing these complex systems and some of the ways that Python can help.

Brief Introduction

Linode Sponsor Banner

Use the promo code podcastinit20 to get a $20 credit when you sign up!

sentry-horizontal-black.png

Stop hoping your users will report bugs. Sentry's real-time tracking gives you insight into production deployments and information to reproduce and fix crashes. Use the code podcastinit at signup to get a $50 credit!

Hired Logo

On Hired software engineers & designers can get 5+ interview requests in a week and each offer has salary and equity upfront. With full time and contract opportunities available, users can view the offers and accept or reject them before talking to any company. Work with over 2,500 companies from startups to large public companies hailing from 12 major tech hubs in North America and Europe. Hired is totally free for users and If you get a job you'll get a $2,000 "thank you" bonus. If you use our special link to signup, then that bonus will double to $4,000 when you accept a job. If you're not looking for a job but know someone who is, you can refer them to Hired and get a $1,337 bonus when they accept a job.

Interview with Cris Medina

Keep In Touch

Picks

Links

The intro and outro music is from Requiem for a Fish The Freak Fandango Orchestra / CC BY-SA

Summary We all know that testing is an important part of software and systems development. The problem is that as our systems and applications grow, the amount of testing necessary increases at an exponential rate. Cris Medina joins us this week to talk about some of the problems and approaches associated with testing these complex systems and some of the ways that Python can help.Brief IntroductionHello and welcome to Podcast.__init__, the podcast about Python and the people who make it great.I would like to thank everyone who has donated to the show. Your contributions help us make the show sustainable. For details on how to support the show you can visit our site at pythonpodcast.comLinode is sponsoring us this week. Check them out at linode.com/podcastinit and get a $20 credit to try out their fast and reliable Linux virtual servers for your next projectWe are also sponsored by Sentry this week. Stop hoping your users will report bugs. Sentry's real-time tracking gives you insight into production deployments and information to reproduce and fix crashes. Check them out at getsentry.comHired has also returned as a sponsor this week. If you're looking for a job as a developer or designer then Hired will bring the opportunities to you. Sign up at hired.com/podcastinit to double your signing bonus.The O'Reilly Velocity conference is coming to New York this September and we have a free ticket to give away. If you would like the chance to win it then just sign up for our newsletter at pythonpodcast.comTo help other people find the show you can leave a review on iTunes, and tell your friends and co-workersJoin our community! Visit discourse.pythonpodcast.com for your opportunity to find out about upcoming guests, suggest questions, and propose show ideas.Your hosts as usual are Tobias Macey and Chris PattiToday we're interviewing Cris Medina about test engineering for large and complex systems. Use the promo code podcastinit20 to get a $20 credit when you sign up! Stop hoping your users will report bugs. Sentry's real-time tracking gives you insight into production deployments and information to reproduce and fix crashes. Use the code podcastinit at signup to get a $50 credit! On Hired software engineers designers can get 5+ interview requests in a week and each offer has salary and equity upfront. With full time and contract opportunities available, users can view the offers and accept or reject them before talking to any company. Work with over 2,500 companies from startups to large public companies hailing from 12 major tech hubs in North America and Europe. Hired is totally free for users and If you get a job you'll get a $2,000 "thank you" bonus. If you use our special link to signup, then that bonus will double to $4,000 when you accept a job. If you're not looking for a job but know someone who is, you can refer them to Hired and get a $1,337 bonus when they accept a job.Interview with Cris MedinaIntroductionsHow did you get introduced to Python? - ChrisTo get us started can you share your definition of test engineering and how it differs from the types of testing that your average developer is used to? - TobiasWhat are some common industries or situations where this kind of test engineering becomes necessary? - TobiasHow and where does Python fit into the kind of testing that becomes necessary when dealing with these complex systems? - TobiasHow do you determine which areas of a system to test and how can Python help in that discovery process? - TobiasWhat are some of your favorite tools and libraries for this kind of work? - TobiasWhat are some of the areas where the existing Python tooling falls short? - TobiasGiven the breadth of concerns that are encompassed with testing the various components of these large systems, what are some ways that a test engineer can get a high-level view of the overall state? - TobiasHow can that information be distilled for presentation to other areas of the business? - TobiasCould

30 Jul 2016 7:21pm GMT

Podcast.__init__: Episode 68 - Test Engineering with Cris Medina

Summary

We all know that testing is an important part of software and systems development. The problem is that as our systems and applications grow, the amount of testing necessary increases at an exponential rate. Cris Medina joins us this week to talk about some of the problems and approaches associated with testing these complex systems and some of the ways that Python can help.

Brief Introduction

Linode Sponsor Banner

Use the promo code podcastinit20 to get a $20 credit when you sign up!

sentry-horizontal-black.png

Stop hoping your users will report bugs. Sentry's real-time tracking gives you insight into production deployments and information to reproduce and fix crashes. Use the code podcastinit at signup to get a $50 credit!

Hired Logo

On Hired software engineers & designers can get 5+ interview requests in a week and each offer has salary and equity upfront. With full time and contract opportunities available, users can view the offers and accept or reject them before talking to any company. Work with over 2,500 companies from startups to large public companies hailing from 12 major tech hubs in North America and Europe. Hired is totally free for users and If you get a job you'll get a $2,000 "thank you" bonus. If you use our special link to signup, then that bonus will double to $4,000 when you accept a job. If you're not looking for a job but know someone who is, you can refer them to Hired and get a $1,337 bonus when they accept a job.

Interview with Cris Medina

Keep In Touch

Picks

Links

The intro and outro music is from Requiem for a Fish The Freak Fandango Orchestra / CC BY-SA

Summary We all know that testing is an important part of software and systems development. The problem is that as our systems and applications grow, the amount of testing necessary increases at an exponential rate. Cris Medina joins us this week to talk about some of the problems and approaches associated with testing these complex systems and some of the ways that Python can help.Brief IntroductionHello and welcome to Podcast.__init__, the podcast about Python and the people who make it great.I would like to thank everyone who has donated to the show. Your contributions help us make the show sustainable. For details on how to support the show you can visit our site at pythonpodcast.comLinode is sponsoring us this week. Check them out at linode.com/podcastinit and get a $20 credit to try out their fast and reliable Linux virtual servers for your next projectWe are also sponsored by Sentry this week. Stop hoping your users will report bugs. Sentry's real-time tracking gives you insight into production deployments and information to reproduce and fix crashes. Check them out at getsentry.comHired has also returned as a sponsor this week. If you're looking for a job as a developer or designer then Hired will bring the opportunities to you. Sign up at hired.com/podcastinit to double your signing bonus.The O'Reilly Velocity conference is coming to New York this September and we have a free ticket to give away. If you would like the chance to win it then just sign up for our newsletter at pythonpodcast.comTo help other people find the show you can leave a review on iTunes, and tell your friends and co-workersJoin our community! Visit discourse.pythonpodcast.com for your opportunity to find out about upcoming guests, suggest questions, and propose show ideas.Your hosts as usual are Tobias Macey and Chris PattiToday we're interviewing Cris Medina about test engineering for large and complex systems. Use the promo code podcastinit20 to get a $20 credit when you sign up! Stop hoping your users will report bugs. Sentry's real-time tracking gives you insight into production deployments and information to reproduce and fix crashes. Use the code podcastinit at signup to get a $50 credit! On Hired software engineers designers can get 5+ interview requests in a week and each offer has salary and equity upfront. With full time and contract opportunities available, users can view the offers and accept or reject them before talking to any company. Work with over 2,500 companies from startups to large public companies hailing from 12 major tech hubs in North America and Europe. Hired is totally free for users and If you get a job you'll get a $2,000 "thank you" bonus. If you use our special link to signup, then that bonus will double to $4,000 when you accept a job. If you're not looking for a job but know someone who is, you can refer them to Hired and get a $1,337 bonus when they accept a job.Interview with Cris MedinaIntroductionsHow did you get introduced to Python? - ChrisTo get us started can you share your definition of test engineering and how it differs from the types of testing that your average developer is used to? - TobiasWhat are some common industries or situations where this kind of test engineering becomes necessary? - TobiasHow and where does Python fit into the kind of testing that becomes necessary when dealing with these complex systems? - TobiasHow do you determine which areas of a system to test and how can Python help in that discovery process? - TobiasWhat are some of your favorite tools and libraries for this kind of work? - TobiasWhat are some of the areas where the existing Python tooling falls short? - TobiasGiven the breadth of concerns that are encompassed with testing the various components of these large systems, what are some ways that a test engineer can get a high-level view of the overall state? - TobiasHow can that information be distilled for presentation to other areas of the business? - TobiasCould

30 Jul 2016 7:21pm GMT

Davy Wybiral: WebGL terrain tutorial



Today I put together a quick tutorial on how to build a 3d terrain in Javascript, from setting up three.js to loading a heightmap and texture. More will be added later (controls and collision).

Each step is organized as a release in the Github repo so you can download them all individually or browse through the commit diffs to see each change being made.

The Github project is here: https://github.com/wybiral/terrain

And, if course, there's a demo of it here: https://wybiral.github.io/terrain/

30 Jul 2016 10:53am GMT

Davy Wybiral: WebGL terrain tutorial



Today I put together a quick tutorial on how to build a 3d terrain in Javascript, from setting up three.js to loading a heightmap and texture. More will be added later (controls and collision).

Each step is organized as a release in the Github repo so you can download them all individually or browse through the commit diffs to see each change being made.

The Github project is here: https://github.com/wybiral/terrain

And, if course, there's a demo of it here: https://wybiral.github.io/terrain/

30 Jul 2016 10:53am GMT

Weekly Python StackOverflow Report: (xxx) stackoverflow python report

These are the ten most rated questions at Stack Overflow last week.
Between brackets: [question score / answers count]
Build date: 2016-07-30 09:25:51 GMT


  1. rounding errors in Python floor division - [20/5]
  2. 0 is 0 == 0 (#evaluates to True?) - [16/2]
  3. Type hint that a function never returns - [9/0]
  4. Python: How to delete rows ending in certain characters? - [8/3]
  5. Pycharm import RuntimeWarning after updating to 2016.2 - [8/1]
  6. Combination of two lists while keeping the order - [7/5]
  7. Convert Python sequence to NumPy array, filling missing values - [7/5]
  8. Python- np.mean() giving wrong means? - [7/1]
  9. Python: get every possible combination of weights for a portfolio - [6/6]
  10. Removing duplicate edges from graph in Python list - [6/4]

30 Jul 2016 9:26am GMT

Weekly Python StackOverflow Report: (xxx) stackoverflow python report

These are the ten most rated questions at Stack Overflow last week.
Between brackets: [question score / answers count]
Build date: 2016-07-30 09:25:51 GMT


  1. rounding errors in Python floor division - [20/5]
  2. 0 is 0 == 0 (#evaluates to True?) - [16/2]
  3. Type hint that a function never returns - [9/0]
  4. Python: How to delete rows ending in certain characters? - [8/3]
  5. Pycharm import RuntimeWarning after updating to 2016.2 - [8/1]
  6. Combination of two lists while keeping the order - [7/5]
  7. Convert Python sequence to NumPy array, filling missing values - [7/5]
  8. Python- np.mean() giving wrong means? - [7/1]
  9. Python: get every possible combination of weights for a portfolio - [6/6]
  10. Removing duplicate edges from graph in Python list - [6/4]

30 Jul 2016 9:26am GMT

Full Stack Python: Python for Entrepreneurs

Python for Entrepreneurs is a new video course by the creators of Talk Python to Me and Full Stack Python.

We are creating this course and running a Kickstarter for it based on feedback that it's still too damn difficult to turn basic Python programming knowledge into a business to generate income as a side or full time project. Both Michael and I have been able to make that happen for ourselves and we want to share every difficult lesson we've learned through this course.

The Python for Entrepreneurs videos and content will dive into building and deploying a real-world web application, marketing it to prospective customers, handling search engine optimization, making money through credit card payments, getting help from part-time contractors for niche tasks and scaling up to meet traffic demands.

If this course hits the mark for what you want to do with Python, check out the Kickstarter - we've set up steep discounts for early backers.

If you have any questions, please reach out to Michael Kennedy or me, Matt Makai.

30 Jul 2016 4:00am GMT

Full Stack Python: Python for Entrepreneurs

Python for Entrepreneurs is a new video course by the creators of Talk Python to Me and Full Stack Python.

We are creating this course and running a Kickstarter for it based on feedback that it's still too damn difficult to turn basic Python programming knowledge into a business to generate income as a side or full time project. Both Michael and I have been able to make that happen for ourselves and we want to share every difficult lesson we've learned through this course.

The Python for Entrepreneurs videos and content will dive into building and deploying a real-world web application, marketing it to prospective customers, handling search engine optimization, making money through credit card payments, getting help from part-time contractors for niche tasks and scaling up to meet traffic demands.

If this course hits the mark for what you want to do with Python, check out the Kickstarter - we've set up steep discounts for early backers.

If you have any questions, please reach out to Michael Kennedy or me, Matt Makai.

30 Jul 2016 4:00am GMT

29 Jul 2016

feedPlanet Python

Brian Okken: 20: Talk Python To Me host Michael Kennedy

I talk with Michael about: Episodes of his show having to do with testing. His transition from employee to podcst host and online training entrepreneur. His Python training courses. The Pyramid Web framework. Givaway We're giving away courses from Talk Python To Me, enter by signing up for my mailing list at pythontesting.net/subscribe Courses by […]

The post 20: Talk Python To Me host Michael Kennedy appeared first on Python Testing.

29 Jul 2016 6:58pm GMT

Brian Okken: 20: Talk Python To Me host Michael Kennedy

I talk with Michael about: Episodes of his show having to do with testing. His transition from employee to podcst host and online training entrepreneur. His Python training courses. The Pyramid Web framework. Givaway We're giving away courses from Talk Python To Me, enter by signing up for my mailing list at pythontesting.net/subscribe Courses by […]

The post 20: Talk Python To Me host Michael Kennedy appeared first on Python Testing.

29 Jul 2016 6:58pm GMT

Jamal Moir: [Video Series] Taking Your Python Skills to the Next Level With Pythonic Code – Hacking Python’s Memory With __slots__

This is the second post in a series covering Pythonic code written by Michael Kennedy of Talk Python To Me. Be sure to catch the whole series with 5 powerful Pythonic recommendations and over 45 minutes of video examples. What if I told you there was a simple technique you can apply to your custom classes that would dramatically decrease […]

The post [Video Series] Taking Your Python Skills to the Next Level With Pythonic Code - Hacking Python's Memory With __slots__ appeared first on Data Dependence.

29 Jul 2016 5:00pm GMT

Jamal Moir: [Video Series] Taking Your Python Skills to the Next Level With Pythonic Code – Hacking Python’s Memory With __slots__

This is the second post in a series covering Pythonic code written by Michael Kennedy of Talk Python To Me. Be sure to catch the whole series with 5 powerful Pythonic recommendations and over 45 minutes of video examples. What if I told you there was a simple technique you can apply to your custom classes that would dramatically decrease […]

The post [Video Series] Taking Your Python Skills to the Next Level With Pythonic Code - Hacking Python's Memory With __slots__ appeared first on Data Dependence.

29 Jul 2016 5:00pm GMT

بايثون العربي: مقدمة لمكتبة urllip

إن وحدة urllip في بايثون 3 عبارة عن عن محموعة من الوحدات التي يمكننا إستخدامها للتعامل مع الروابط ، وإذا كنت قادم من بايثون 2 ستجد أن بايثون 2 لديها urllip و urllip2 أما في بايثون 3 أصبحوا عبارة عن حزمة واحدة تحت إسم urllip ويتكون الإصدار الحالي من :

سنقوم بتغطية كل جزء ماعدا urllib.error

urllib.request

تستخدم هذه الوحدة أساسا لفتح وجلب الروابط ، دعونا نقوم ببعض الأمثلة لنرى مالذي يمكننا فعله مع وظيفة urlopen


>>>import urllib.request
>>> url = urllib.request.urlopen('https://www.google.com/')
 >>>url.geturl()
'https://www.google.com/'
>>>url.info()
lt;http.client.HTTPMessage object at 0x7fddc2de04e0
>>> header = url.info()
>>>header.as_string()
('Date: Fri, 24 Jun 2016 18:21:19 GMT\n'
'Expires: -1\n'
'Cache-Control: private, max-age=0\n'
'Content-Type: text/html; charset=ISO-8859-1\n'
'P3P: CP="This is not a P3P policy! See '
'https://www.google.com/support/accounts/answer/151657?hl=en for more info."\n'
'Server: gws\n'
'X-XSS-Protection: 1; mode=block\n'
'X-Frame-Options: SAMEORIGIN\n'
'Set-Cookie: '
'NID=80=tYjmy0JY6flsSVj7DPSSZNOuqdvqKfKHDcHsPIGu3xFv41LvH_Jg6LrUsDgkPrtM2hmZ3j9V76pS4K_
cBg7pdwueMQfr0DFzw33SwpGex5qzLkXUvUVPfe9g699Qz4cx9ipcbU3HKwrRYA; '
'expires=Sat, 24-Dec-2016 18:21:19 GMT; path=/; domain=.google.com; HttpOnly\n'
'Alternate-Protocol: 443:quic\n'
'Alt-Svc: quic=":443"; ma=2592000; v="34,33,32,31,30,29,28,27,26,25"\n'
'Accept-Ranges: none\n'
'Vary: Accept-Encoding\n'
'Connection: close\n'
'\n')
>>> url.getcode()
200

قمنا بإستدعاء وجدتنا وقمنا أيضا بفتح موقع google وأصبح لدينا كائن HTTPResponse نستطيع التعامل معه ، الأمر الأول الذي قمنا به هو تنفيذ geturl للحصول على رابط وهي مفيدة لمعرفة ما إذا تم إعادة توجيهنا إلى رابط أخر أم لا .

بعد ذلك قمنا بإستدعاء info والذي سيعرض بيانات وصفية حول الصفحة مثل معلومات حول headers وهكذا يمكننا تعيين نتيجة هذا الأخير لمتغير headers ثم نقوم بمناداة as_string وهذا سيقوم بعرض header المستقبل من موقع قوقل ، يمكننا أيضا الحصول على رمز الإستجابة الخاص ب HTTP بإستدعائنا ل getcode وفي حالتنا هذه هو عبارة عن 200 وهذا يعني نجاح العملية .

إذا كنت تريد الإطلاع على كود HTML الخاص بالصفحة يمكنك مناداة طريقة READ على متغير url الذي قمنا بإنشاءه، لن أقوم بعرض الناتج لأنه سيكون كبير نوعا ما ولكن يمكنك تجربته لتتأكد.

يرجى أيضا الـتأكد من أن كائن الطلب الإفتراضي هو GET مالم يتم تحديد خيارات البيانات .

تحميل ملف

أهم إستخدامات حزمة urllib هو تحميل الملفات فدعونا نارى بعض الأمثلة لتحقيق ذلك


>>> import urllib.request
>>>url = 'http://www.pyarab.com/wp-content/uploads/2016/07/n.zip'
>>>response = urllib.request.urlopen(url)
>>> data = response.read()
>>> with open('/home/mike/Desktop/test.zip', 'wb') as fobj:
... fobj.write(data)
...

هنا قمنا بفتح الرابط الذي يقودنا إلى الملف المطلوب للتحميل ثم نقوم بقراءة البيانات وبالتالي تتم عملية التحميل .

هناك طريقة أخرى لتحقيق نفس العملية وهي بإستخدام urlretrieve :


>>> import urllib.request
>>> url = 'http://www.pyarab.com/wp-content/uploads/2016/07/n.zip'
>>> urllib.request.urlretrieve(url, '/home/mike/Desktop/blog.zip')
('/home/mike/Desktop/blog.zip',
 <http.client.HTTPMessage object at 0x7fddc21c2470>)

هذه الطريقة ستقوم بنسخ كائن الشبكة إلى الملف المحلي ويتم تسمية الملف المنسوخ عشوائيا ويتم نسخه على دليل مؤقت إلا في حالة إستخدام خيارات خاصة لتجعله يقوم بحفظ الملف في دليل معين :


>>> import urllib.request
>>> url = 'http://www.blog.pythonlibrary.org/wp-content/uploads/2012/06/wxDbViewer.zip'
>>> tmp_file, header = urllib.request.urlretrieve(url)
>>> with open('/home/mike/Desktop/test.zip', 'wb') as fobj:
...     with open(tmp_file, 'rb') as tmp:
...         fobj.write(tmp.read())

وكما ترى فإنه في الأخير يقوم بعرض عنوان الدليل التي حفظ الملف عليه بالإضافة إلى معلومات رأس الطلب .

تحديد وكيل المستخدم الخاص بك

عندما تقوم بزيارة موقع ما من خلال المتصفح فإن هذالإخير يخبر الموقع من هو ، وهذا مايسمى بسلسلة user-agent ، مكتبة بايثون urllip تقوم بتحديد نفسها ك : Python-urllib/x.y حيث أن X و Y رقم إصدار بايثون الرئيسي والثانوي ، بعض المواقع لن تتعرف على سلسلة وكيل المستخدم وستتصرف بطريقة غريبة أو لن تعمل على الإطلاق .

لحسن الحظ يمكن إعداد سلسلة وكيل المستخدم خاص بك :


>>> import urllib.request
>>> user_agent = ' Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:47.0) Gecko/20100101 Firefox/47.0'
>>> url = 'http://www.pyarab.com/'
>>> headers = {'User-Agent': user_agent}
>>> request = urllib.request.Request(url, headers=headers)
>>> with urllib.request.urlopen(request) as response:
...     with open('/home/mdriscoll/Desktop/user_agent.html', 'wb') as out:
...         out.write(response.read())

هنا قمنا بتعيين وكيل مستخدم لموزيلا فايرفوكس وقمنا بتعيين الموقع الذي نريد منه إختبار وكيل المستخدم الخاص بنا ، بعد ذلك قمنا بتعيين طلب بإستخدام رابطنا والرأس ومررناه إلى urlopen وأخير قمنا بحفظ الناتج وإذا قمت بفتح ملف الذي يحتوي على الناتج ستجد أننا قمنا بتغيير سلسلة وكيل المستخدك بنجاح .

حاول أن تجرب تغيير وكيل المستخدم عدة مرات مع هذا الكود حتى تفهم ألية العمل .

urllib.parse

تعتبر مكتبة urllib.parse هي الواجهة القياسية لتفكيك سلاسل الروابط و إعادة ربطهم مرة أخرى ويمكن أيضا إستخدامها لتحويل الروابط النسبية إلى روابط مطلقة ، دعونا نأخذ مثال عن تحليل الرابط يحتوي على علامة إستفهام


>>> from urllib.parse import urlparse
>>> result = urlparse('https://duckduckgo.com/?q=python+stubbing&t=canonical&ia=qa')
>>> result
ParseResult(scheme='https', netloc='duckduckgo.com', path='/', params='', query='q=python+stubbing&t=canonical&ia=qa', fragment='')
>>> result.netloc
'duckduckgo.com'
>>> result.geturl()
'https://duckduckgo.com/?q=python+stubbing&t=canonical&ia=qa'
>>> result.port
None

قمنا بإستدعاء وظيفة urlparse ومررناها إلى رابط يحتوي على علامة إستفهام والتي نسأل من خلالها على python stubbing وكان الناتج عبارة عن كائن ParseResult والذي يكمن إستخدامه للمزيد من المعلومات عن الرابط (رقم المنفذ،مكان السبكة،المسار إلخ…).

تقديم نموذج ويب

هذه الوحدة أيضا تحتوي على طريقة urlencode وهي جيدة لتمرير البيانات إلى عناوين الروابط ، والإستخدام الشائع لمكتبة urllib.parse هو تقديم نماذج الويب ، فدعونا نكتشف كيفية عمل محرك بحث ل duckduckgo خاص ببايثون


>>> import urllib.request
>>> import urllib.parse
>>> data = urllib.parse.urlencode({'q': 'Python'})
>>> data
'q=Python'
>>> url = 'http://duckduckgo.com/html/'
>>> full_url = url + '?' + data
>>> response = urllib.request.urlopen(full_url)
>>> with open('/home/mike/Desktop/results.html', 'wb') as f:
...     f.write(response.read())

مثال واضح وصريح ففي الأساس نريد تقديم إستعلام ل duckduckgo بإنفسنا بإستخدام بايثون بدلا من المتصفح ولفعل ذلك نحتاج إلى بناء سلسلة إستفهام بإستخدام urlencode ثم نقوم بجمع كل ذلك لنشكل رابط مؤهل تماما ثم نقوم بإستخدام urllib.request لتقديم نموذج الويب بعد ذلك نقوم بالإستيلاء على النتيجة وحفظها .

إلى هنا نكون قد وصلنا إلى مرحلة ينبغي لك أن تكون قادرا على التعامل مع حزمة urllib ، فقد تعلمنا كيفية تحميل ملف ، تقديم نموذج ويب ، تغيير عميل المستخدم وهذا ا يعني ان المكتبة تقوم بهذه الوظائف فقط بل على العكس هناك العديد من المهام التي يمكن لها أن تقوم بها .

29 Jul 2016 11:44am GMT

بايثون العربي: مقدمة لمكتبة urllip

إن وحدة urllip في بايثون 3 عبارة عن عن محموعة من الوحدات التي يمكننا إستخدامها للتعامل مع الروابط ، وإذا كنت قادم من بايثون 2 ستجد أن بايثون 2 لديها urllip و urllip2 أما في بايثون 3 أصبحوا عبارة عن حزمة واحدة تحت إسم urllip ويتكون الإصدار الحالي من :

سنقوم بتغطية كل جزء ماعدا urllib.error

urllib.request

تستخدم هذه الوحدة أساسا لفتح وجلب الروابط ، دعونا نقوم ببعض الأمثلة لنرى مالذي يمكننا فعله مع وظيفة urlopen


>>>import urllib.request
>>> url = urllib.request.urlopen('https://www.google.com/')
 >>>url.geturl()
'https://www.google.com/'
>>>url.info()
lt;http.client.HTTPMessage object at 0x7fddc2de04e0
>>> header = url.info()
>>>header.as_string()
('Date: Fri, 24 Jun 2016 18:21:19 GMT\n'
'Expires: -1\n'
'Cache-Control: private, max-age=0\n'
'Content-Type: text/html; charset=ISO-8859-1\n'
'P3P: CP=&quot;This is not a P3P policy! See '
'https://www.google.com/support/accounts/answer/151657?hl=en for more info.&quot;\n'
'Server: gws\n'
'X-XSS-Protection: 1; mode=block\n'
'X-Frame-Options: SAMEORIGIN\n'
'Set-Cookie: '
'NID=80=tYjmy0JY6flsSVj7DPSSZNOuqdvqKfKHDcHsPIGu3xFv41LvH_Jg6LrUsDgkPrtM2hmZ3j9V76pS4K_
cBg7pdwueMQfr0DFzw33SwpGex5qzLkXUvUVPfe9g699Qz4cx9ipcbU3HKwrRYA; '
'expires=Sat, 24-Dec-2016 18:21:19 GMT; path=/; domain=.google.com; HttpOnly\n'
'Alternate-Protocol: 443:quic\n'
'Alt-Svc: quic=&quot;:443&quot;; ma=2592000; v=&quot;34,33,32,31,30,29,28,27,26,25&quot;\n'
'Accept-Ranges: none\n'
'Vary: Accept-Encoding\n'
'Connection: close\n'
'\n')
>>> url.getcode()
200

قمنا بإستدعاء وجدتنا وقمنا أيضا بفتح موقع google وأصبح لدينا كائن HTTPResponse نستطيع التعامل معه ، الأمر الأول الذي قمنا به هو تنفيذ geturl للحصول على رابط وهي مفيدة لمعرفة ما إذا تم إعادة توجيهنا إلى رابط أخر أم لا .

بعد ذلك قمنا بإستدعاء info والذي سيعرض بيانات وصفية حول الصفحة مثل معلومات حول headers وهكذا يمكننا تعيين نتيجة هذا الأخير لمتغير headers ثم نقوم بمناداة as_string وهذا سيقوم بعرض header المستقبل من موقع قوقل ، يمكننا أيضا الحصول على رمز الإستجابة الخاص ب HTTP بإستدعائنا ل getcode وفي حالتنا هذه هو عبارة عن 200 وهذا يعني نجاح العملية .

إذا كنت تريد الإطلاع على كود HTML الخاص بالصفحة يمكنك مناداة طريقة READ على متغير url الذي قمنا بإنشاءه، لن أقوم بعرض الناتج لأنه سيكون كبير نوعا ما ولكن يمكنك تجربته لتتأكد.

يرجى أيضا الـتأكد من أن كائن الطلب الإفتراضي هو GET مالم يتم تحديد خيارات البيانات .

تحميل ملف

أهم إستخدامات حزمة urllib هو تحميل الملفات فدعونا نارى بعض الأمثلة لتحقيق ذلك


>>> import urllib.request
>>>url = 'http://www.pyarab.com/wp-content/uploads/2016/07/n.zip'
>>>response = urllib.request.urlopen(url)
>>> data = response.read()
>>> with open('/home/mike/Desktop/test.zip', 'wb') as fobj:
... fobj.write(data)
...

هنا قمنا بفتح الرابط الذي يقودنا إلى الملف المطلوب للتحميل ثم نقوم بقراءة البيانات وبالتالي تتم عملية التحميل .

هناك طريقة أخرى لتحقيق نفس العملية وهي بإستخدام urlretrieve :


>>> import urllib.request
>>> url = 'http://www.pyarab.com/wp-content/uploads/2016/07/n.zip'
>>> urllib.request.urlretrieve(url, '/home/mike/Desktop/blog.zip')
('/home/mike/Desktop/blog.zip',
 <http.client.HTTPMessage object at 0x7fddc21c2470>)

هذه الطريقة ستقوم بنسخ كائن الشبكة إلى الملف المحلي ويتم تسمية الملف المنسوخ عشوائيا ويتم نسخه على دليل مؤقت إلا في حالة إستخدام خيارات خاصة لتجعله يقوم بحفظ الملف في دليل معين :


>>> import urllib.request
>>> url = 'http://www.blog.pythonlibrary.org/wp-content/uploads/2012/06/wxDbViewer.zip'
>>> tmp_file, header = urllib.request.urlretrieve(url)
>>> with open('/home/mike/Desktop/test.zip', 'wb') as fobj:
...     with open(tmp_file, 'rb') as tmp:
...         fobj.write(tmp.read())

وكما ترى فإنه في الأخير يقوم بعرض عنوان الدليل التي حفظ الملف عليه بالإضافة إلى معلومات رأس الطلب .

تحديد وكيل المستخدم الخاص بك

عندما تقوم بزيارة موقع ما من خلال المتصفح فإن هذالإخير يخبر الموقع من هو ، وهذا مايسمى بسلسلة user-agent ، مكتبة بايثون urllip تقوم بتحديد نفسها ك : Python-urllib/x.y حيث أن X و Y رقم إصدار بايثون الرئيسي والثانوي ، بعض المواقع لن تتعرف على سلسلة وكيل المستخدم وستتصرف بطريقة غريبة أو لن تعمل على الإطلاق .

لحسن الحظ يمكن إعداد سلسلة وكيل المستخدم خاص بك :


>>> import urllib.request
>>> user_agent = ' Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:47.0) Gecko/20100101 Firefox/47.0'
>>> url = 'http://www.pyarab.com/'
>>> headers = {'User-Agent': user_agent}
>>> request = urllib.request.Request(url, headers=headers)
>>> with urllib.request.urlopen(request) as response:
...     with open('/home/mdriscoll/Desktop/user_agent.html', 'wb') as out:
...         out.write(response.read())

هنا قمنا بتعيين وكيل مستخدم لموزيلا فايرفوكس وقمنا بتعيين الموقع الذي نريد منه إختبار وكيل المستخدم الخاص بنا ، بعد ذلك قمنا بتعيين طلب بإستخدام رابطنا والرأس ومررناه إلى urlopen وأخير قمنا بحفظ الناتج وإذا قمت بفتح ملف الذي يحتوي على الناتج ستجد أننا قمنا بتغيير سلسلة وكيل المستخدك بنجاح .

حاول أن تجرب تغيير وكيل المستخدم عدة مرات مع هذا الكود حتى تفهم ألية العمل .

urllib.parse

تعتبر مكتبة urllib.parse هي الواجهة القياسية لتفكيك سلاسل الروابط و إعادة ربطهم مرة أخرى ويمكن أيضا إستخدامها لتحويل الروابط النسبية إلى روابط مطلقة ، دعونا نأخذ مثال عن تحليل الرابط يحتوي على علامة إستفهام


>>> from urllib.parse import urlparse
>>> result = urlparse('https://duckduckgo.com/?q=python+stubbing&t=canonical&ia=qa')
>>> result
ParseResult(scheme='https', netloc='duckduckgo.com', path='/', params='', query='q=python+stubbing&t=canonical&ia=qa', fragment='')
>>> result.netloc
'duckduckgo.com'
>>> result.geturl()
'https://duckduckgo.com/?q=python+stubbing&t=canonical&ia=qa'
>>> result.port
None

قمنا بإستدعاء وظيفة urlparse ومررناها إلى رابط يحتوي على علامة إستفهام والتي نسأل من خلالها على python stubbing وكان الناتج عبارة عن كائن ParseResult والذي يكمن إستخدامه للمزيد من المعلومات عن الرابط (رقم المنفذ،مكان السبكة،المسار إلخ…).

تقديم نموذج ويب

هذه الوحدة أيضا تحتوي على طريقة urlencode وهي جيدة لتمرير البيانات إلى عناوين الروابط ، والإستخدام الشائع لمكتبة urllib.parse هو تقديم نماذج الويب ، فدعونا نكتشف كيفية عمل محرك بحث ل duckduckgo خاص ببايثون


>>> import urllib.request
>>> import urllib.parse
>>> data = urllib.parse.urlencode({'q': 'Python'})
>>> data
'q=Python'
>>> url = 'http://duckduckgo.com/html/'
>>> full_url = url + '?' + data
>>> response = urllib.request.urlopen(full_url)
>>> with open('/home/mike/Desktop/results.html', 'wb') as f:
...     f.write(response.read())

مثال واضح وصريح ففي الأساس نريد تقديم إستعلام ل duckduckgo بإنفسنا بإستخدام بايثون بدلا من المتصفح ولفعل ذلك نحتاج إلى بناء سلسلة إستفهام بإستخدام urlencode ثم نقوم بجمع كل ذلك لنشكل رابط مؤهل تماما ثم نقوم بإستخدام urllib.request لتقديم نموذج الويب بعد ذلك نقوم بالإستيلاء على النتيجة وحفظها .

إلى هنا نكون قد وصلنا إلى مرحلة ينبغي لك أن تكون قادرا على التعامل مع حزمة urllib ، فقد تعلمنا كيفية تحميل ملف ، تقديم نموذج ويب ، تغيير عميل المستخدم وهذا ا يعني ان المكتبة تقوم بهذه الوظائف فقط بل على العكس هناك العديد من المهام التي يمكن لها أن تقوم بها .

29 Jul 2016 11:44am GMT

Simon: udatetime a fast RFC3339 compliant date-time Python library

Working with date-time formats can be pretty upsetting because of the variate of different formats people can come up with. date-times are used everywhere not just only logging or meta data in database entries and are pretty important. That's why I encourage developers in using the ISO 8601 derived RFC3339 standard for their projects.

RFC3339 date-time: 2016-07-18T12:58:26.485897 +02:00

The RFC3339 specification offers the following advantages:

Having a date-time standard is nice, but using Python's datetime library to parse/format a RFC3339 date-time string or even create a datetime object in UTC or local timezone can be painful and slowwwww. That's why I decided to implement a Python 2 library to deal with such tasks. The library is called udatetime and available on github or PyPI.

$ pip install udatetime

The goal of the library is to be fast and handy with RFC3339 date-time formatted strings. The average performance increase of udatetime compared to the equivalent datetime code is 76%. Due to the usage of Python2 CPython API and POSIX features the library is currently only supported on POSIX systems and not Python3 or Pypy compatible. I'm working on cross-platform and Pypy support. Support in working on the library is greatly appreciated.

Benchmark

The benchmark setup is the following.

from datetime import datetime
import udatetime

RFC3339_DATE = '2016-07-18'
RFC3339_TIME = '12:58:26.485897+02:00'
RFC3339_DATE_TIME = RFC3339_DATE + 'T' + RFC3339_TIME
RFC3339_DATE_TIME_DTLIB = RFC3339_DATE_TIME[:-6]  # datetime can't parse timezones through strptime
DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
DATETIME_OBJ = datetime.strptime(RFC3339_DATE_TIME_DTLIB, DATE_TIME_FORMAT)


def benchmark_parse():
    def datetime_strptime():
        datetime.strptime(RFC3339_DATE_TIME_DTLIB, DATE_TIME_FORMAT)

    def udatetime_parse():
        udatetime.from_string(RFC3339_DATE_TIME)

    return (datetime_strptime, udatetime_parse)


def benchmark_format():
    def datetime_strftime():
        DATETIME_OBJ.strftime(DATE_TIME_FORMAT)

    def udatetime_format():
        udatetime.to_string(DATETIME_OBJ)

    return (datetime_strftime, udatetime_format)


def benchmark_utcnow():
    def datetime_utcnow():
        datetime.utcnow()

    def udatetime_utcnow():
        udatetime.utcnow()

    return (datetime_utcnow, udatetime_utcnow)


def benchmark_now():
    def datetime_now():
        datetime.now()

    def udatetime_now():
        udatetime.now()

    return (datetime_now, udatetime_now)


def benchmark_utcnow_to_string():
    def datetime_utcnow_to_string():
        datetime.utcnow().strftime(DATE_TIME_FORMAT)

    def udatetime_utcnow_to_string():
        udatetime.utcnow_to_string()

    return (datetime_utcnow_to_string, udatetime_utcnow_to_string)


def benchmark_now_to_string():
    def datetime_now_to_string():
        datetime.now().strftime(DATE_TIME_FORMAT)

    def udatetime_now_to_string():
        udatetime.now_to_string()

    return (datetime_now_to_string, udatetime_now_to_string)

If you like you can run the benchmark yourself by running the bench.py script from the repository.

The results of 1 million executions and 3 repeats look like this.

benchmark_parse

datetime.strptime(RFC3339_DATE_TIME_DTLIB, DATE_TIME_FORMAT)
vs
udatetime.from_string(RFC3339_DATE_TIME)

Benchmark results benchmark_parse

benchmark_format

DATETIME_OBJ.strftime(DATE_TIME_FORMAT)
vs
udatetime.to_string(DATETIME_OBJ)

Benchmark results benchmark_format

benchmark_now

datetime.now()
vs
udatetime.now()

Benchmark results benchmark_now

benchmark_utcnow

datetime.utcnow()
vs
udatetime.utcnow()

Benchmark results benchmark_utcnow

benchmark_now_to_string

datetime.now().strftime(DATE_TIME_FORMAT)
vs
udatetime.now_to_string()

Benchmark results benchmark_now_to_string

benchmark_utcnow_to_string

datetime.utcnow().strftime(DATE_TIME_FORMAT)
vs
udatetime.utcnow_to_string()

Benchmark results benchmark_utcnow_to_string

29 Jul 2016 9:29am GMT

Simon: udatetime a fast RFC3339 compliant date-time Python library

Working with date-time formats can be pretty upsetting because of the variate of different formats people can come up with. date-times are used everywhere not just only logging or meta data in database entries and are pretty important. That's why I encourage developers in using the ISO 8601 derived RFC3339 standard for their projects.

RFC3339 date-time: 2016-07-18T12:58:26.485897 +02:00

The RFC3339 specification offers the following advantages:

Having a date-time standard is nice, but using Python's datetime library to parse/format a RFC3339 date-time string or even create a datetime object in UTC or local timezone can be painful and slowwwww. That's why I decided to implement a Python 2 library to deal with such tasks. The library is called udatetime and available on github or PyPI.

$ pip install udatetime

The goal of the library is to be fast and handy with RFC3339 date-time formatted strings. The average performance increase of udatetime compared to the equivalent datetime code is 76%. Due to the usage of Python2 CPython API and POSIX features the library is currently only supported on POSIX systems and not Python3 or Pypy compatible. I'm working on cross-platform and Pypy support. Support in working on the library is greatly appreciated.

Benchmark

The benchmark setup is the following.

from datetime import datetime
import udatetime

RFC3339_DATE = '2016-07-18'
RFC3339_TIME = '12:58:26.485897+02:00'
RFC3339_DATE_TIME = RFC3339_DATE + 'T' + RFC3339_TIME
RFC3339_DATE_TIME_DTLIB = RFC3339_DATE_TIME[:-6]  # datetime can't parse timezones through strptime
DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
DATETIME_OBJ = datetime.strptime(RFC3339_DATE_TIME_DTLIB, DATE_TIME_FORMAT)


def benchmark_parse():
    def datetime_strptime():
        datetime.strptime(RFC3339_DATE_TIME_DTLIB, DATE_TIME_FORMAT)

    def udatetime_parse():
        udatetime.from_string(RFC3339_DATE_TIME)

    return (datetime_strptime, udatetime_parse)


def benchmark_format():
    def datetime_strftime():
        DATETIME_OBJ.strftime(DATE_TIME_FORMAT)

    def udatetime_format():
        udatetime.to_string(DATETIME_OBJ)

    return (datetime_strftime, udatetime_format)


def benchmark_utcnow():
    def datetime_utcnow():
        datetime.utcnow()

    def udatetime_utcnow():
        udatetime.utcnow()

    return (datetime_utcnow, udatetime_utcnow)


def benchmark_now():
    def datetime_now():
        datetime.now()

    def udatetime_now():
        udatetime.now()

    return (datetime_now, udatetime_now)


def benchmark_utcnow_to_string():
    def datetime_utcnow_to_string():
        datetime.utcnow().strftime(DATE_TIME_FORMAT)

    def udatetime_utcnow_to_string():
        udatetime.utcnow_to_string()

    return (datetime_utcnow_to_string, udatetime_utcnow_to_string)


def benchmark_now_to_string():
    def datetime_now_to_string():
        datetime.now().strftime(DATE_TIME_FORMAT)

    def udatetime_now_to_string():
        udatetime.now_to_string()

    return (datetime_now_to_string, udatetime_now_to_string)

If you like you can run the benchmark yourself by running the bench.py script from the repository.

The results of 1 million executions and 3 repeats look like this.

benchmark_parse

datetime.strptime(RFC3339_DATE_TIME_DTLIB, DATE_TIME_FORMAT)
vs
udatetime.from_string(RFC3339_DATE_TIME)

Benchmark results benchmark_parse

benchmark_format

DATETIME_OBJ.strftime(DATE_TIME_FORMAT)
vs
udatetime.to_string(DATETIME_OBJ)

Benchmark results benchmark_format

benchmark_now

datetime.now()
vs
udatetime.now()

Benchmark results benchmark_now

benchmark_utcnow

datetime.utcnow()
vs
udatetime.utcnow()

Benchmark results benchmark_utcnow

benchmark_now_to_string

datetime.now().strftime(DATE_TIME_FORMAT)
vs
udatetime.now_to_string()

Benchmark results benchmark_now_to_string

benchmark_utcnow_to_string

datetime.utcnow().strftime(DATE_TIME_FORMAT)
vs
udatetime.utcnow_to_string()

Benchmark results benchmark_utcnow_to_string

29 Jul 2016 9:29am GMT

Semaphore Community: Getting Started with Behavior Testing in Python with Behave

This article is brought with ❤ to you by Semaphore.

Introduction

Behavior testing simply means that we should test how an application behaves in certain situations. Often the behavior is given to us developers by our customers. They describe the functionality of an application, and we write code to meet their specifications. Behavioral tests are a tool to formalize their requirements into tests. This leads naturally to behavior-driven development (BDD).

After completing this tutorial, you should be able to:

Prerequisites

Before starting, make sure you have the following installed:

Setting Up Your Environment

This tutorial will walk you through writing tests for and coding a feature of a Twenty-One (or "Blackjack") game. Specifically, we'll be testing the logic for the dealer. To get started, create a root directory where your code will go, and then create the following directories and blank files:

.
├── features
│   ├── dealer.feature
│   └── steps
│       └── steps.py
└── twentyone.py

Here's a brief explanation of the files:

Writing Your First Test

Although behavioral tests do not require test-driven development, the two methodologies go hand-in-hand. We'll approach this problem from a test-driven perspective, so instead of jumping to code, we'll start with the tests.

Writing the Scenario

Open dealer.feature and add the following first line:

Feature: The dealer for the game of 21

This line describes the feature. In a large application, you would have many features. Next, we'll add a test. The first test will be simple - when the round starts, the dealer should deal itself two cards. The word Behave uses to define a test is "Scenario", so go ahead and add the following line:

Scenario: Deal initial cards

Before we write more, we need to understand the three phases of a basic Behave test: "Given", "When", and "Then". "Given" initializes a state, "When" describes an action, and "Then" states the expected outcome. For this test, our state is a new dealer object, the action is the round starting, and the expected outcome is that the dealer has two cards. Here's how this is translated into a Behave test:

Scenario: Deal initial cards
  Given a dealer
  When the round starts
  Then the dealer gives itself two cards

Notice that the three phases read like a normal English sentence. You should strive for this when writing behavioral tests because they are easily readable by anyone working in the code base.

Now to see how Behave works, simply open a terminal in the root directory of your code and run the following command:

behave

You should see this output:

Feature: The dealer for the game of 21 # features/dealer.feature:1

  Scenario: Deal initial cards             # features/dealer.feature:3
    Given a dealer                         # None
    When the round starts                  # None
    Then the dealer gives itself two cards # None


Failing scenarios:
  features/dealer.feature:3  Deal initial cards

0 features passed, 1 failed, 0 skipped
0 scenarios passed, 1 failed, 0 skipped
0 steps passed, 0 failed, 0 skipped, 3 undefined
Took 0m0.000s

You can implement step definitions for undefined steps with these snippets:
[ The rest of output removed for brevity ]

The key part here is that we have one failing scenario (and therefore a failing feature) that we need to fix. Below that, Behave suggests how to implement steps. You can think of a step as a task for Behave to execute. Each phase ("given", "when", and "then") are all implemented as steps.

Writing the Steps

The steps that Behave runs are written in Python and they are the link between the descriptive tests in .feature files and the actual application code. Go ahead and open steps.py and add the following imports:

from behave import *
from twentyone import *

Behave steps use annotations that match the names of the phases. This is the first step as described in the scenario:

@given('a dealer')
def step_impl(context):
    context.dealer = Dealer()

It's important to notice that the text inside of the annotation matches the scenario text exactly. If it doesn't match, the test cannot run.

The context object is passed from step to step, and it is where we can store information to be used by other steps. Since this step is a "given", we need to initialize our state. We do that by creating a Dealer object, and attaching that object to the context. If you run behave again, you'll see the test fails, but now for a different reason: We haven't defined the Dealer class yet! Again, we have a failing test that is "driving" us to do work.

Now we will open twentyone.py and create a Dealer class:

class Dealer():
    pass

Run behave once again to verify that we fixed the last error we saw, but that the scenario still fails because the "when" and "then" steps are not implemented. From here on, the tutorial will not explicitly state when you should run behave. But remember, the cycle is to write a test, see that it fails, and then write code to make the test pass.

Here are the next steps to add to steps.py:

@when('the round starts')
def step_impl(context):
    context.dealer.new_round()


@then('the dealer gives itself two cards')
def step_impl(context):
    assert (len(context.dealer.hand) == 2)

Again, the annotation text matches the text in the scenario exactly. In the "when" step, we have access to the dealer created in "given" and we can now call a method on that object. Finally, in the "then" step, we still have access to the dealer, and we assert that the dealer has two cards in its hand.

We defined two new pieces of code that need to be implemented: new_round() and hand. Switch back to twentyone.py and add the following to the Dealer class:

class Dealer():
    def __init__(self):
        self.hand = []

    def new_round(self):
        self.hand = [_next_card(), _next_card()]

The _next_card() function will be defined as a top-level function of the module, along with a definition of the cards. At the top of the file, add the following:

import random

_cards = ['A', '2', '3', '4', '5', '6', '7', '8', '9', '10', 'J', 'Q', 'K']


def _next_card():
    return random.choice(_cards)

Remember that random is not secure and should not be used in a real implementation of this game, but for this tutorial it will be fine.

If you run behave now, you should see that the test passes:

Feature: The dealer for the game of 21 # features/dealer.feature:1

  Scenario: Deal initial cards             # features/dealer.feature:3
    Given a dealer                         # features/steps/steps.py:5 0.000s
    When the round starts                  # features/steps/steps.py:9 0.000s
    Then the dealer gives itself two cards # features/steps/steps.py:14 0.000s

1 feature passed, 0 failed, 0 skipped
1 scenario passed, 0 failed, 0 skipped
3 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m0.000s

Writing Tableized Tests

Often when writing tests we want to test the same behavior against many different parameters and check the results. Behave makes this easier to do by providing tools to create a tableized test instead of writing out each test separately. The next game logic to test is that the dealer knows the point value of its hand. Here is a test that checks several scenarios:

Scenario Outline: Get hand total
  Given a <hand>
  When the dealer sums the cards
  Then the <total> is correct

  Examples: Hands
  | hand          | total |
  | 5,7           | 12    |
  | 5,Q           | 15    |
  | Q,Q,A         | 21    |
  | Q,A           | 21    |
  | A,A,A         | 13    |

You should recognize the familiar "given, when, then" pattern, but there's a lot of differences in this test. First, it is called a "Scenario Outline". Next, it uses parameters in angle brackets that correspond to the headers of the table. Finally, there's a table of inputs ("hand") and outputs ("total").

The steps will be similar to what we've seen before, but we'll now get to use the parameterized steps feature of Behave.

Here's how to implement the new "given" step:

@given('a {hand}')
def step_impl(context, hand):
    context.dealer = Dealer()
    context.dealer.hand = hand.split(',')

The angle brackets in the dealer.feature file are replaced with braces, and the hand parameter becomes an object that is passed to the step, along with the context.

Just like before, we create a new Dealer object, but this time we manually set the dealer's cards instead of generating them randomly. Since the hand parameter is a simple string, we split the parameter to get a list.

Next, add the remaining steps:

@when('the dealer sums the cards')
def step_impl(context):
    context.dealer_total = context.dealer.get_hand_total()

@then('the {total:d} is correct')
def step_impl(context, total):
    assert (context.dealer_total == total)

The "when" step is nothing new, and the "then" step should look familiar. If you're wondering about the ":d" after the total parameter, that is a shortcut to tell Behave to treat the parameter as an integer. It saves us from manually casting with the int() function. Here's a complete list of patterns that Behave accepts and if you need advanced parsing, you can define your own pattern.

There's many different approaches to summing values of cards, but here's one solution to find the total of the dealer's hand. Create this as a top-level function in the twentyone.py module:

def _hand_total(hand):
    values = [None, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10, 10]
    value_map = {k: v for k, v in zip(_cards, values)}

    total = sum([value_map[card] for card in hand if card != 'A'])
    ace_count = hand.count('A')

    for i in range(ace_count, -1, -1):
        if i == 0:
            total = total + ace_count
        elif total + (i * 11) + (ace_count - i) <= 21:
            total = total + (i * 11) + ace_count - i
            break

    return total

In short, the function maps the card character strings to point values, and sums the values. However, aces have to be handled separately because they can value 1 or 11 points.

We also need to give the dealer the ability to total its cards. Add this function to the Dealer class:

def get_hand_total(self):
    return _hand_total(self.hand)

If you run behave now, you'll see that each example in the table runs as its own scenario. This saves a lot of space in the features file, but still gives us rigorous tests that pass or fail individually.

We'll add one more tableized test, this time to test that the dealer plays by the rules. Traditionally, the dealer must play "hit" until he or she has 17 or more points. Add this scenario outline to test that behavior:

Scenario Outline: Dealer plays by the rules
  Given a hand <total>
   when the dealer determines a play
   then the <play> is correct

  Examples: Hands
  | total  | play   |
  | 10     | hit    |
  | 15     | hit    |
  | 16     | hit    |
  | 17     | stand  |
  | 18     | stand  |
  | 19     | stand  |
  | 20     | stand  |
  | 21     | stand  |
  | 22     | stand  |

Before we add the next steps, it's important to understand that when using parameters, the order matters. Parameterized steps should be ordered from most restrictive to least restrictive. If you do not do this, the correct step may not be matched by Behave. To make this easier, group your steps by type. Here is the new given step, ordered properly:

@given('a dealer')
def step_impl(context):
    context.dealer = Dealer()

## NEW STEP
@given('a hand {total:d}')
def step_impl(context, total):
    context.dealer = Dealer()
    context.total = total


@given('a {hand}')
def step_impl(context, hand):
    context.dealer = Dealer()
    context.dealer.hand = hand.split(',')

The typed parameter {total:d} is more restrictive than the untyped {hand}, so it must come earlier in the file.

The new "when" step is not parameterized and can be placed anywhere, but, for readability, should be grouped with the other when steps:

@when('the dealer determines a play')
def step_impl(context):
    context.dealer_play = context.dealer.determine_play(context.total)

Notice that this test expects a determine_play() method, which we can add to the Dealer class:

def determine_play(self, total):
    if total < 17:
        return 'hit'
    else:
        return 'stand'

Last, the "then" step is parameterized so it needs to also be ordered properly:

@then('the dealer gives itself two cards')
def step_impl(context):
    assert (len(context.dealer.hand) == 2)


@then('the {total:d} is correct')
def step_impl(context, total):
    assert (context.dealer_total == total)

## NEW STEP
@then('the {play} is correct')
def step_impl(context, play):
    assert (context.dealer_play == play)

Putting Everything Together

We're going to add one final test that will tie together all of the code we've just written. We've proven to ourselves with tests that the dealer can deal itself cards, determine its hand total, and make a play separately, but there's no code to tie this together. Since we are emphasizing test-driven development, let's add a test for this behavior.

Scenario: A Dealer can always play
  Given a dealer
  When the round starts
  Then the dealer chooses a play

We already wrote steps for the "given" and "when" statements, but we need to add a step for "the dealer chooses a play." Add this new step, and be sure to order it properly:

@then('the dealer gives itself two cards')
def step_impl(context):
    assert (len(context.dealer.hand) == 2)

#NEW STEP
@then('the dealer chooses a play')
def step_impl(context):
    assert (context.dealer.make_play() in ['stand', 'hit'])


@then('the {total:d} is correct')
def step_impl(context, total):
    assert (context.dealer_total == total)

This test relies on a new method make_play() that you should now add to the Dealer class:

def make_play(self):
    return self.determine_play(self.get_hand_total())

This method isn't critical, but makes it easier to use the Dealer class.

If you've done everything correctly, running behave should display all of the tests and give a summary similar to this:

1 feature passed, 0 failed, 0 skipped
16 scenarios passed, 0 failed, 0 skipped
48 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m0.007s

Conclusion

This tutorial walked you through setting up a new project with the Behave library and using test-driven development to build the code based off of behavioral tests.

If you would like to get experience writing more tests with this project, try implementing a Player class and player.feature that plays with some basic strategy.

To learn more about BDD and why you might want to adopt it, check out our article on Behavior-Driven Development.

This article is brought with ❤ to you by Semaphore.

29 Jul 2016 8:47am GMT

Semaphore Community: Getting Started with Behavior Testing in Python with Behave

This article is brought with ❤ to you by Semaphore.

Introduction

Behavior testing simply means that we should test how an application behaves in certain situations. Often the behavior is given to us developers by our customers. They describe the functionality of an application, and we write code to meet their specifications. Behavioral tests are a tool to formalize their requirements into tests. This leads naturally to behavior-driven development (BDD).

After completing this tutorial, you should be able to:

Prerequisites

Before starting, make sure you have the following installed:

Setting Up Your Environment

This tutorial will walk you through writing tests for and coding a feature of a Twenty-One (or "Blackjack") game. Specifically, we'll be testing the logic for the dealer. To get started, create a root directory where your code will go, and then create the following directories and blank files:

.
├── features
│   ├── dealer.feature
│   └── steps
│       └── steps.py
└── twentyone.py

Here's a brief explanation of the files:

Writing Your First Test

Although behavioral tests do not require test-driven development, the two methodologies go hand-in-hand. We'll approach this problem from a test-driven perspective, so instead of jumping to code, we'll start with the tests.

Writing the Scenario

Open dealer.feature and add the following first line:

Feature: The dealer for the game of 21

This line describes the feature. In a large application, you would have many features. Next, we'll add a test. The first test will be simple - when the round starts, the dealer should deal itself two cards. The word Behave uses to define a test is "Scenario", so go ahead and add the following line:

Scenario: Deal initial cards

Before we write more, we need to understand the three phases of a basic Behave test: "Given", "When", and "Then". "Given" initializes a state, "When" describes an action, and "Then" states the expected outcome. For this test, our state is a new dealer object, the action is the round starting, and the expected outcome is that the dealer has two cards. Here's how this is translated into a Behave test:

Scenario: Deal initial cards
  Given a dealer
  When the round starts
  Then the dealer gives itself two cards

Notice that the three phases read like a normal English sentence. You should strive for this when writing behavioral tests because they are easily readable by anyone working in the code base.

Now to see how Behave works, simply open a terminal in the root directory of your code and run the following command:

behave

You should see this output:

Feature: The dealer for the game of 21 # features/dealer.feature:1

  Scenario: Deal initial cards             # features/dealer.feature:3
    Given a dealer                         # None
    When the round starts                  # None
    Then the dealer gives itself two cards # None


Failing scenarios:
  features/dealer.feature:3  Deal initial cards

0 features passed, 1 failed, 0 skipped
0 scenarios passed, 1 failed, 0 skipped
0 steps passed, 0 failed, 0 skipped, 3 undefined
Took 0m0.000s

You can implement step definitions for undefined steps with these snippets:
[ The rest of output removed for brevity ]

The key part here is that we have one failing scenario (and therefore a failing feature) that we need to fix. Below that, Behave suggests how to implement steps. You can think of a step as a task for Behave to execute. Each phase ("given", "when", and "then") are all implemented as steps.

Writing the Steps

The steps that Behave runs are written in Python and they are the link between the descriptive tests in .feature files and the actual application code. Go ahead and open steps.py and add the following imports:

from behave import *
from twentyone import *

Behave steps use annotations that match the names of the phases. This is the first step as described in the scenario:

@given('a dealer')
def step_impl(context):
    context.dealer = Dealer()

It's important to notice that the text inside of the annotation matches the scenario text exactly. If it doesn't match, the test cannot run.

The context object is passed from step to step, and it is where we can store information to be used by other steps. Since this step is a "given", we need to initialize our state. We do that by creating a Dealer object, and attaching that object to the context. If you run behave again, you'll see the test fails, but now for a different reason: We haven't defined the Dealer class yet! Again, we have a failing test that is "driving" us to do work.

Now we will open twentyone.py and create a Dealer class:

class Dealer():
    pass

Run behave once again to verify that we fixed the last error we saw, but that the scenario still fails because the "when" and "then" steps are not implemented. From here on, the tutorial will not explicitly state when you should run behave. But remember, the cycle is to write a test, see that it fails, and then write code to make the test pass.

Here are the next steps to add to steps.py:

@when('the round starts')
def step_impl(context):
    context.dealer.new_round()


@then('the dealer gives itself two cards')
def step_impl(context):
    assert (len(context.dealer.hand) == 2)

Again, the annotation text matches the text in the scenario exactly. In the "when" step, we have access to the dealer created in "given" and we can now call a method on that object. Finally, in the "then" step, we still have access to the dealer, and we assert that the dealer has two cards in its hand.

We defined two new pieces of code that need to be implemented: new_round() and hand. Switch back to twentyone.py and add the following to the Dealer class:

class Dealer():
    def __init__(self):
        self.hand = []

    def new_round(self):
        self.hand = [_next_card(), _next_card()]

The _next_card() function will be defined as a top-level function of the module, along with a definition of the cards. At the top of the file, add the following:

import random

_cards = ['A', '2', '3', '4', '5', '6', '7', '8', '9', '10', 'J', 'Q', 'K']


def _next_card():
    return random.choice(_cards)

Remember that random is not secure and should not be used in a real implementation of this game, but for this tutorial it will be fine.

If you run behave now, you should see that the test passes:

Feature: The dealer for the game of 21 # features/dealer.feature:1

  Scenario: Deal initial cards             # features/dealer.feature:3
    Given a dealer                         # features/steps/steps.py:5 0.000s
    When the round starts                  # features/steps/steps.py:9 0.000s
    Then the dealer gives itself two cards # features/steps/steps.py:14 0.000s

1 feature passed, 0 failed, 0 skipped
1 scenario passed, 0 failed, 0 skipped
3 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m0.000s

Writing Tableized Tests

Often when writing tests we want to test the same behavior against many different parameters and check the results. Behave makes this easier to do by providing tools to create a tableized test instead of writing out each test separately. The next game logic to test is that the dealer knows the point value of its hand. Here is a test that checks several scenarios:

Scenario Outline: Get hand total
  Given a <hand>
  When the dealer sums the cards
  Then the <total> is correct

  Examples: Hands
  | hand          | total |
  | 5,7           | 12    |
  | 5,Q           | 15    |
  | Q,Q,A         | 21    |
  | Q,A           | 21    |
  | A,A,A         | 13    |

You should recognize the familiar "given, when, then" pattern, but there's a lot of differences in this test. First, it is called a "Scenario Outline". Next, it uses parameters in angle brackets that correspond to the headers of the table. Finally, there's a table of inputs ("hand") and outputs ("total").

The steps will be similar to what we've seen before, but we'll now get to use the parameterized steps feature of Behave.

Here's how to implement the new "given" step:

@given('a {hand}')
def step_impl(context, hand):
    context.dealer = Dealer()
    context.dealer.hand = hand.split(',')

The angle brackets in the dealer.feature file are replaced with braces, and the hand parameter becomes an object that is passed to the step, along with the context.

Just like before, we create a new Dealer object, but this time we manually set the dealer's cards instead of generating them randomly. Since the hand parameter is a simple string, we split the parameter to get a list.

Next, add the remaining steps:

@when('the dealer sums the cards')
def step_impl(context):
    context.dealer_total = context.dealer.get_hand_total()

@then('the {total:d} is correct')
def step_impl(context, total):
    assert (context.dealer_total == total)

The "when" step is nothing new, and the "then" step should look familiar. If you're wondering about the ":d" after the total parameter, that is a shortcut to tell Behave to treat the parameter as an integer. It saves us from manually casting with the int() function. Here's a complete list of patterns that Behave accepts and if you need advanced parsing, you can define your own pattern.

There's many different approaches to summing values of cards, but here's one solution to find the total of the dealer's hand. Create this as a top-level function in the twentyone.py module:

def _hand_total(hand):
    values = [None, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10, 10]
    value_map = {k: v for k, v in zip(_cards, values)}

    total = sum([value_map[card] for card in hand if card != 'A'])
    ace_count = hand.count('A')

    for i in range(ace_count, -1, -1):
        if i == 0:
            total = total + ace_count
        elif total + (i * 11) + (ace_count - i) <= 21:
            total = total + (i * 11) + ace_count - i
            break

    return total

In short, the function maps the card character strings to point values, and sums the values. However, aces have to be handled separately because they can value 1 or 11 points.

We also need to give the dealer the ability to total its cards. Add this function to the Dealer class:

def get_hand_total(self):
    return _hand_total(self.hand)

If you run behave now, you'll see that each example in the table runs as its own scenario. This saves a lot of space in the features file, but still gives us rigorous tests that pass or fail individually.

We'll add one more tableized test, this time to test that the dealer plays by the rules. Traditionally, the dealer must play "hit" until he or she has 17 or more points. Add this scenario outline to test that behavior:

Scenario Outline: Dealer plays by the rules
  Given a hand <total>
   when the dealer determines a play
   then the <play> is correct

  Examples: Hands
  | total  | play   |
  | 10     | hit    |
  | 15     | hit    |
  | 16     | hit    |
  | 17     | stand  |
  | 18     | stand  |
  | 19     | stand  |
  | 20     | stand  |
  | 21     | stand  |
  | 22     | stand  |

Before we add the next steps, it's important to understand that when using parameters, the order matters. Parameterized steps should be ordered from most restrictive to least restrictive. If you do not do this, the correct step may not be matched by Behave. To make this easier, group your steps by type. Here is the new given step, ordered properly:

@given('a dealer')
def step_impl(context):
    context.dealer = Dealer()

## NEW STEP
@given('a hand {total:d}')
def step_impl(context, total):
    context.dealer = Dealer()
    context.total = total


@given('a {hand}')
def step_impl(context, hand):
    context.dealer = Dealer()
    context.dealer.hand = hand.split(',')

The typed parameter {total:d} is more restrictive than the untyped {hand}, so it must come earlier in the file.

The new "when" step is not parameterized and can be placed anywhere, but, for readability, should be grouped with the other when steps:

@when('the dealer determines a play')
def step_impl(context):
    context.dealer_play = context.dealer.determine_play(context.total)

Notice that this test expects a determine_play() method, which we can add to the Dealer class:

def determine_play(self, total):
    if total < 17:
        return 'hit'
    else:
        return 'stand'

Last, the "then" step is parameterized so it needs to also be ordered properly:

@then('the dealer gives itself two cards')
def step_impl(context):
    assert (len(context.dealer.hand) == 2)


@then('the {total:d} is correct')
def step_impl(context, total):
    assert (context.dealer_total == total)

## NEW STEP
@then('the {play} is correct')
def step_impl(context, play):
    assert (context.dealer_play == play)

Putting Everything Together

We're going to add one final test that will tie together all of the code we've just written. We've proven to ourselves with tests that the dealer can deal itself cards, determine its hand total, and make a play separately, but there's no code to tie this together. Since we are emphasizing test-driven development, let's add a test for this behavior.

Scenario: A Dealer can always play
  Given a dealer
  When the round starts
  Then the dealer chooses a play

We already wrote steps for the "given" and "when" statements, but we need to add a step for "the dealer chooses a play." Add this new step, and be sure to order it properly:

@then('the dealer gives itself two cards')
def step_impl(context):
    assert (len(context.dealer.hand) == 2)

#NEW STEP
@then('the dealer chooses a play')
def step_impl(context):
    assert (context.dealer.make_play() in ['stand', 'hit'])


@then('the {total:d} is correct')
def step_impl(context, total):
    assert (context.dealer_total == total)

This test relies on a new method make_play() that you should now add to the Dealer class:

def make_play(self):
    return self.determine_play(self.get_hand_total())

This method isn't critical, but makes it easier to use the Dealer class.

If you've done everything correctly, running behave should display all of the tests and give a summary similar to this:

1 feature passed, 0 failed, 0 skipped
16 scenarios passed, 0 failed, 0 skipped
48 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m0.007s

Conclusion

This tutorial walked you through setting up a new project with the Behave library and using test-driven development to build the code based off of behavioral tests.

If you would like to get experience writing more tests with this project, try implementing a Player class and player.feature that plays with some basic strategy.

To learn more about BDD and why you might want to adopt it, check out our article on Behavior-Driven Development.

This article is brought with ❤ to you by Semaphore.

29 Jul 2016 8:47am GMT

Glyph Lefkowitz: Don’t Trust Sourceforge, Ever

If you use a computer and you use the Internet, chances are you'll eventually find some software that, for whatever reason, is still hosted on Sourceforge. In case you're not familiar with it, Sourceforge is a publicly-available malware vector that also sometimes contains useful open source binary downloads, especially for Windows.


In addition to injecting malware into their downloads (a practice they claim, hopefully truthfully, to have stopped), Sourceforge also presents an initial download page over HTTPS, then redirects the user to HTTP for the download itself, snatching defeat from the jaws of victory. This is fantastically irresponsible, especially for a site offering un-sandboxed binaries for download, especially in the era of Let's Encrypt where getting a TLS certificate takes approximately thirty seconds and exactly zero dollars.

So: if you can possibly find your downloads anywhere else, go there.


But, rarely, you will find yourself at the mercy of whatever responsible stewards1 are still operating Sourceforge if you want to get access to some useful software. As it happens, there is a loophole that will let you authenticate the binaries that you download from them so you won't be left vulnerable to an evil barista: their "file release system", the thing you use to upload your projects, will allow you to download other projects as well.

To use it, first, make yourself a sourceforge account. You may need to create a dummy project as well. Sourceforge maintains an HTTPS-accessible list of key fingerprints for all the SSH servers that they operate, so you can verify the public key below.

Then you'll need to connect to their upload server over SFTP, and go to the path /home/frs/project/<the project's name>/.../ to get the file.

I have written a little Python script2 that automates the translation of a Sourceforge file-browser download URL, one that you can get if you right-click on a download in the "files" section of a project's website, and runs the relevant scp command to retrieve the file for you. This isn't on PyPI or anything, and I'm not putting any effort into polishing it further; the best possible outcome of this blog post is that it immediately stops being necessary.


  1. Are you one of those people? I would prefer to be lauding your legacy of decades of valuable contributions to the open source community instead of ridiculing your dangerous incompetence, but repeated bug reports and support emails have gone unanswered. Please get in touch so we can discuss this.

  2. Code:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    
    #!/usr/bin/env python2
    
    import sys
    import os
    
    sfuri = sys.argv[1]
    
    # for example,
    # http://sourceforge.net/projects/refind/files/0.9.2/refind-bin-0.9.2.zip/download
    
    import re
    matched = re.match(
        r"https://sourceforge.net/projects/(.*)/files/(.*)/download",
        sfuri
    )
    
    if not matched:
        sys.stderr.write("Not a SourceForge download link.\n")
        sys.exit(1)
    
    project, path = matched.groups()
    
    sftppath = "/home/frs/project/{project}/{path}".format(project=project, path=path)
    
    def knows_about_web_sf_net():
        with open(
                os.path.expanduser("~/.ssh/known_hosts"), "rb"
        ) as read_known_hosts:
            data = read_known_hosts.read().split("\n")
            for line in data:
                if 'web.sourceforge.net' in line.split()[0]:
                    return True
        return False
    
    sfkey = """
    web.sourceforge.net ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA2uifHZbNexw6cXbyg1JnzDitL5VhYs0E65Hk/tLAPmcmm5GuiGeUoI/B0eUSNFsbqzwgwrttjnzKMKiGLN5CWVmlN1IXGGAfLYsQwK6wAu7kYFzkqP4jcwc5Jr9UPRpJdYIK733tSEmzab4qc5Oq8izKQKIaxXNe7FgmL15HjSpatFt9w/ot/CHS78FUAr3j3RwekHCm/jhPeqhlMAgC+jUgNJbFt3DlhDaRMa0NYamVzmX8D47rtmBbEDU3ld6AezWBPUR5Lh7ODOwlfVI58NAf/aYNlmvl2TZiauBCTa7OPYSyXJnIPbQXg6YQlDknNCr0K769EjeIlAfY87Z4tw==
    """
    
    if not knows_about_web_sf_net():
        with open(
                os.path.expanduser("~/.ssh/known_hosts"), "ab"
        ) as append_known_hosts:
            append_known_hosts.write(sfkey)
    cmd = "scp web.sourceforge.net:{sftppath} .".format(sftppath=sftppath)
    print(cmd)
    os.system(cmd)
    

29 Jul 2016 6:06am GMT

Glyph Lefkowitz: Don’t Trust Sourceforge, Ever

If you use a computer and you use the Internet, chances are you'll eventually find some software that, for whatever reason, is still hosted on Sourceforge. In case you're not familiar with it, Sourceforge is a publicly-available malware vector that also sometimes contains useful open source binary downloads, especially for Windows.


In addition to injecting malware into their downloads (a practice they claim, hopefully truthfully, to have stopped), Sourceforge also presents an initial download page over HTTPS, then redirects the user to HTTP for the download itself, snatching defeat from the jaws of victory. This is fantastically irresponsible, especially for a site offering un-sandboxed binaries for download, especially in the era of Let's Encrypt where getting a TLS certificate takes approximately thirty seconds and exactly zero dollars.

So: if you can possibly find your downloads anywhere else, go there.


But, rarely, you will find yourself at the mercy of whatever responsible stewards1 are still operating Sourceforge if you want to get access to some useful software. As it happens, there is a loophole that will let you authenticate the binaries that you download from them so you won't be left vulnerable to an evil barista: their "file release system", the thing you use to upload your projects, will allow you to download other projects as well.

To use it, first, make yourself a sourceforge account. You may need to create a dummy project as well. Sourceforge maintains an HTTPS-accessible list of key fingerprints for all the SSH servers that they operate, so you can verify the public key below.

Then you'll need to connect to their upload server over SFTP, and go to the path /home/frs/project/<the project's name>/.../ to get the file.

I have written a little Python script2 that automates the translation of a Sourceforge file-browser download URL, one that you can get if you right-click on a download in the "files" section of a project's website, and runs the relevant scp command to retrieve the file for you. This isn't on PyPI or anything, and I'm not putting any effort into polishing it further; the best possible outcome of this blog post is that it immediately stops being necessary.


  1. Are you one of those people? I would prefer to be lauding your legacy of decades of valuable contributions to the open source community instead of ridiculing your dangerous incompetence, but repeated bug reports and support emails have gone unanswered. Please get in touch so we can discuss this.

  2. Code:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    
    #!/usr/bin/env python2
    
    import sys
    import os
    
    sfuri = sys.argv[1]
    
    # for example,
    # http://sourceforge.net/projects/refind/files/0.9.2/refind-bin-0.9.2.zip/download
    
    import re
    matched = re.match(
        r"https://sourceforge.net/projects/(.*)/files/(.*)/download",
        sfuri
    )
    
    if not matched:
        sys.stderr.write("Not a SourceForge download link.\n")
        sys.exit(1)
    
    project, path = matched.groups()
    
    sftppath = "/home/frs/project/{project}/{path}".format(project=project, path=path)
    
    def knows_about_web_sf_net():
        with open(
                os.path.expanduser("~/.ssh/known_hosts"), "rb"
        ) as read_known_hosts:
            data = read_known_hosts.read().split("\n")
            for line in data:
                if 'web.sourceforge.net' in line.split()[0]:
                    return True
        return False
    
    sfkey = """
    web.sourceforge.net ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA2uifHZbNexw6cXbyg1JnzDitL5VhYs0E65Hk/tLAPmcmm5GuiGeUoI/B0eUSNFsbqzwgwrttjnzKMKiGLN5CWVmlN1IXGGAfLYsQwK6wAu7kYFzkqP4jcwc5Jr9UPRpJdYIK733tSEmzab4qc5Oq8izKQKIaxXNe7FgmL15HjSpatFt9w/ot/CHS78FUAr3j3RwekHCm/jhPeqhlMAgC+jUgNJbFt3DlhDaRMa0NYamVzmX8D47rtmBbEDU3ld6AezWBPUR5Lh7ODOwlfVI58NAf/aYNlmvl2TZiauBCTa7OPYSyXJnIPbQXg6YQlDknNCr0K769EjeIlAfY87Z4tw==
    """
    
    if not knows_about_web_sf_net():
        with open(
                os.path.expanduser("~/.ssh/known_hosts"), "ab"
        ) as append_known_hosts:
            append_known_hosts.write(sfkey)
    cmd = "scp web.sourceforge.net:{sftppath} .".format(sftppath=sftppath)
    print(cmd)
    os.system(cmd)
    

29 Jul 2016 6:06am GMT

tryexceptpass: Great! Glad to hear I inspired some creativity!!!

I'll be posting some project links soon so folks can start contributing if interested.

Continue reading on Medium »

29 Jul 2016 4:24am GMT

tryexceptpass: Great! Glad to hear I inspired some creativity!!!

I'll be posting some project links soon so folks can start contributing if interested.

Continue reading on Medium »

29 Jul 2016 4:24am GMT

tryexceptpass: I’ve seen flexx, but I want to try to do it in native Python without transpiling.

Continue reading on Medium »

29 Jul 2016 4:21am GMT

tryexceptpass: I’ve seen flexx, but I want to try to do it in native Python without transpiling.

Continue reading on Medium »

29 Jul 2016 4:21am GMT

tryexceptpass: Yes, I was made aware of remi after my first article.

I think it's great to see that other folks have gone down a similar path… I'm mostly doing this as an exercise in working with websockets…

Continue reading on Medium »

29 Jul 2016 4:19am GMT

tryexceptpass: Yes, I was made aware of remi after my first article.

I think it's great to see that other folks have gone down a similar path… I'm mostly doing this as an exercise in working with websockets…

Continue reading on Medium »

29 Jul 2016 4:19am GMT

28 Jul 2016

feedPlanet Python

Marcos Dione: osm-centerlines-two-days-after

In this last two days I've been expanding osm-centerlines. Now it not only supports ways more complex than a simple rectangle, but also ones that lead to 'branches' (unfortunately, most probably because the mapper either imported bad data or mapped it himself). Still, I tested it in very complex polygons and the result is not pretty. There is still lots of room for improvements.

Unluckily, it's not as stand alone as it could be. The problem is that, so far, the algos force you to provide now only the polygon you want to process, but also its skeleton and medial. The code extends the medial using info extracted from the skeleton in such a way that the resulting medial ends on a segment of the polygon, hopefully the one(s) that cross from one riverbank to another at down and upstream. Calculating the skeleton could be performed by CGAL, but the current Python binding doesn't include that function yet. As for the medial, SFCGAL (a C++ wrapper for CGAL) exports a function that calculates an approximative medial, but there seem to be no Python bindings for them yet.

So, a partial solution would be to use PostGIS-2.2's ST_StraightSkeleton() and ST_ApproximateMedialAxis(), so I added a function called skeleton_medial_from_postgis(). The parameters are a psycopg2 connection to a PostgreSQL+PostGIS database and the way you want to calculate, as a shapely.geometry, and it returns the skeleton and the medial ready to be fed into extend_medials(). The result of that should be ready for mapping.

So there's that. I'll be trying to improve it in the next days, and start looking into converting it into a JOSM plugin.


openstreetmap gis python

28 Jul 2016 10:39pm GMT

Marcos Dione: osm-centerlines-two-days-after

In this last two days I've been expanding osm-centerlines. Now it not only supports ways more complex than a simple rectangle, but also ones that lead to 'branches' (unfortunately, most probably because the mapper either imported bad data or mapped it himself). Still, I tested it in very complex polygons and the result is not pretty. There is still lots of room for improvements.

Unluckily, it's not as stand alone as it could be. The problem is that, so far, the algos force you to provide now only the polygon you want to process, but also its skeleton and medial. The code extends the medial using info extracted from the skeleton in such a way that the resulting medial ends on a segment of the polygon, hopefully the one(s) that cross from one riverbank to another at down and upstream. Calculating the skeleton could be performed by CGAL, but the current Python binding doesn't include that function yet. As for the medial, SFCGAL (a C++ wrapper for CGAL) exports a function that calculates an approximative medial, but there seem to be no Python bindings for them yet.

So, a partial solution would be to use PostGIS-2.2's ST_StraightSkeleton() and ST_ApproximateMedialAxis(), so I added a function called skeleton_medial_from_postgis(). The parameters are a psycopg2 connection to a PostgreSQL+PostGIS database and the way you want to calculate, as a shapely.geometry, and it returns the skeleton and the medial ready to be fed into extend_medials(). The result of that should be ready for mapping.

So there's that. I'll be trying to improve it in the next days, and start looking into converting it into a JOSM plugin.


openstreetmap gis python

28 Jul 2016 10:39pm GMT

Continuum Analytics News: Dask and scikit-learn: a 3-Part Tutorial

Developer Blog

Posted Thursday, July 28, 2016
Jim Crist

Jim Crist

Continuum Analytics

Dask core contributor Jim Crist has put together a series of posts discussing some recent experiments combining Dask and scikit-learn on his blog, Marginally Stable. From these experiments, a small library has been built up, and can be found here.

The tutorial spans three posts, which covers model parallelism, data parallelism and combining the two with a real-life dataset.

Part I: Dask & scikit-learn: Model Parallelism

In this post we'll look instead at model-parallelism (use same data across different models), and dive into a daskified implementation of GridSearchCV.

grid_search_schedule.gif

Part II: Dask & scikit-learn: Data Parallelism

In the last post we discussed model-parallelism - fitting several models across the same data. In this post we'll look into simple patterns for data-parallelism, which will allow fitting a single model on larger datasets.

Screen Shot 2016-07-28 at 12.31.22 PM.png

Part III: Dask & scikit-learn: Putting it All Together

In this post we'll combine the above concepts together to do distributed learning and grid search on a real dataset; namely the airline dataset. This contains information on every flight in the USA between 1987 and 2008.

distributed_grid_search_webui.gif

Keep up with Jim and his blog by following him on Twitter, @jiminy_crist.

28 Jul 2016 5:34pm GMT

Continuum Analytics News: Dask and scikit-learn: a 3-Part Tutorial

Developer Blog

Posted Thursday, July 28, 2016
Jim Crist

Jim Crist

Continuum Analytics

Dask core contributor Jim Crist has put together a series of posts discussing some recent experiments combining Dask and scikit-learn on his blog, Marginally Stable. From these experiments, a small library has been built up, and can be found here.

The tutorial spans three posts, which covers model parallelism, data parallelism and combining the two with a real-life dataset.

Part I: Dask & scikit-learn: Model Parallelism

In this post we'll look instead at model-parallelism (use same data across different models), and dive into a daskified implementation of GridSearchCV.

grid_search_schedule.gif

Part II: Dask & scikit-learn: Data Parallelism

In the last post we discussed model-parallelism - fitting several models across the same data. In this post we'll look into simple patterns for data-parallelism, which will allow fitting a single model on larger datasets.

Screen Shot 2016-07-28 at 12.31.22 PM.png

Part III: Dask & scikit-learn: Putting it All Together

In this post we'll combine the above concepts together to do distributed learning and grid search on a real dataset; namely the airline dataset. This contains information on every flight in the USA between 1987 and 2008.

distributed_grid_search_webui.gif

Keep up with Jim and his blog by following him on Twitter, @jiminy_crist.

28 Jul 2016 5:34pm GMT

Automating OSINT: Dark Web OSINT With Python and OnionScan: Part One

You may have heard of this awesome tool called OnionScan that is used to scan hidden services in the dark web looking for potential data leaks. Recently the project released some cool visualizations and a high level description of what their scanning results looked like. What they didn't provide is how to actually go about scanning as much of the dark web as possible, and then how to produce those very cool visualizations that they show.

At a high level we need to do the following:

  1. Setup a server somewhere to host our scanner 24/7 because it takes some time to do the scanning work.
  2. Get TOR running on the server.
  3. Get OnionScan setup.
  4. Write some Python to handle the scanning and some of the other data management to deal with the scan results.
  5. Write some more Python to make some cool graphs. (Part Two of the series)

Let's get started!

Setting up a Digital Ocean Droplet

If you already use Amazon, or have your own Linux server somewhere you can skip this step. For the rest of you, you can use my referral link here to get a $10 credit with Digital Ocean that will get you a couple months free (full disclosure I make money in my Digital Ocean account if you start paying for your server, feel free to bypass that referral link and pay for your own server). I am assuming you are running Ubuntu 16.04 for the rest of the instructions.

  1. The first thing you need to do is to create a new Droplet by clicking on the big Create Droplet button.
  2. Next select a Ubuntu 16.04 configuration, and select the $5.00/month option (unless you want something more powerful).
  3. You can pick a datacenter wherever you like, and then scroll to the bottom and click Create.

It will begin creating your droplet, and soon you should receive an email with how to access your new Linux server. If you are on Mac OSX or Linux get your terminal open. If you are on Windows then grab Putty from here.

Now you are going to SSH into your new server. Windows Putty users just punch the IP address in that you received in your email and hit Enter. You will be authenticating as the root user and then type in the password you were provided in your email.

For Mac OSX and Linux people you will type the following into your terminal:

ssh root@IPADDRESS

You will be forced enter your password a second time, and then you have to change your password. Once that is done you should now be logged into your server.

Installing Prerequisites

Now we need to install the prerequisites for our upcoming code and for OnionScan. Follow each of these steps carefully and the instructions are the same for Mac OSX, Linux or Windows because the commands are all being run on the server.

Feel free to copy and paste each command instead of typing it out. Hit Enter on your keyboard after each step and watch for any problems or errors.

screen
apt-get update
apt-get install tor git bison libexif-dev
apt-get install python-pip
pip install stem

Now we need to install the Go requirements (OnionScan is written in Go). The following instructions are from Ryan Frankel's post here.

bash < <(curl -s -S -L https://raw.githubusercontent.com/moovweb/gvm/master/binscripts/gvm-installer)
[[ -s "$HOME/.gvm/scripts/gvm" ]] && source "$HOME/.gvm/scripts/gvm"
source /root/.gvm/scripts/gvm
gvm install go1.4 --binary
gvm use go1.4

Ok beauty we have Go installed. Now let's get OnionScan setup by entering the following:

go get github.com/s-rah/onionscan
go install github.com/s-rah/onionscan

Now if you just type:

onionscan

And hit Enter you should get the onionscan command line usage information. If this all worked then you have successfully installed OnionScan. If you for some reason close your terminal and you can't run the onionscan binary anymore just simply do a:

gvm use go1.4

and it will fix it for you.

Now we need to make a small modification to the TOR configuration to allow our Python script to request a new identity (a new IP address) which we will use when we run into scanning trouble later on. We have to enable this by doing the following:

tor --hash-password PythonRocks

This will give you output that will include the bottom line that looks like this:

16:3E73307B3E434914604C25C498FBE5F9B3A3AE2FB97DAF70616591AAF8

Copy this line and then type:

nano -w /etc/tor/torrc

This will open a simple text editor. Now go to the bottom of the file by hitting the following keystrokes (or endlessly scrolling down):

CTRL+W CTRL+V

Paste in the following values at the bottom of the file:

ControlPort 9051
ControlListenAddress 127.0.0.1
HashedControlPassword 16:3E73307B3E434914604C25C498FBE5F9B3A3AE2FB97DAF70616591AAF8

Now hit CTRL+O to write the file and CTRL+X to exit the file editor. Now type:

service tor restart

This will restart TOR and it should have our new settings in place. Note that if you want to use a password other than PythonRocks you will have to follow the steps above substituting your own password in place, and you will also have to later change the associated Python code.

We are almost ready to start writing some code. The last step is to grab my list of .onion addresses (at last count around 7182 addresses) so that your script has a starting point to start scanning hidden services.

wget https://raw.githubusercontent.com/automatingosint/osint_public/master/onionrunner/onion_master_list.txt

Whew! We are all setup and ready to start punching out some code. At this point you can switch to your local machine or if you are comfortable writing code on a Linux server by all means go for it. I find it easier to use WingIDE on my local machine personally.

A Note About Screen

You notice that both sets of instructions I have you run the screen command. This is a handy way to keep your session alive even if you get disconnected from your server. When you want to jump back into that session, you simply SSH back into the server and execute:

screen -rx

This will be handy later on when you start doing your scanning work, as it can take days for it to complete fully.

Writing an OnionScan Wrapper

OnionScan is a great tool but we need to be able to systematically control it, and process the results. As well, TOR connections are notoriously unstable so we need a way to kill a stuck scan process and grab a fresh IP address from the TOR network. Let's get coding! Crack open a new Python file, name it onionrunner.py and start punching out the following (you can download the full code here).

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):
        
        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:
                
                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")
                
                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)
                
                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())
                
                print "[!!!] Switched TOR identities."
        
        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle new onions.
#
def add_new_onions(new_onion_list):
        
        global onions
        global session_onions
        
        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)
        
        return

#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

Now we have to build some helper functions that will deal with loading our master list of onions and to be able to continue adding newly discovered onions to this list:

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

Now we will implement the function that deals with running the onionscan binary to do the actual scanning work. Keep adding code in your editor:

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

So there you have a neat trick to deal with some timing issues when running command line binaries. Now let's implement the actual timeout handling function to deal will killing the OnionScan and requesting a new IP from the Tor network. Keep on adding code:

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

Now we need to implement the function that will handle processing the JSON results that OnionScan hands back to us. March on good Python soldier:

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

Let's implement that function now.

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

Now let's start putting the finishing touches on this script.

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

Now it's time to put the main loop in place that will be responsible for kickstarting OnionScan for each hidden service that we have stored.

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

                        count += 1

Whew! That is a lot of code, but hopefully you have learned a few new Python coding tricks along the way, and it might give you ideas on how you can wrap other scanning software in a similar way as we did with OnionScan. Now for the moment of truth…

Let it Rip!

Now you are ready to start scanning! Simple run:

python onionrunner.py

And you should start seeing output like the following:


# python onionrunner.py
[*] Total onions for scanning: 7182
[*] Running 0 of 7182.
[*] Onionscanning nfokjthabqzfndmj.onion
[*] Running 1 of 7182.
[*] Onionscanning gmts3xxfrbfxdm3a.onion


If you check the onionscan_results directory you should see a JSON files that are named by the hidden service that was scanned. Let this puppy run as long as you can tolerate, in the second post we are going to process these JSON files and begin to create some visualizations. For bonus points you can also push those JSON files into Elasticsearch (or modify onionrunner.py to do so on the fly) and analyze the results using Kibana!

If you don't want to wait to get all of the data yourself, you can download the scan results for 8,167 onions from here.

28 Jul 2016 1:39pm GMT

Automating OSINT: Dark Web OSINT With Python and OnionScan: Part One

You may have heard of this awesome tool called OnionScan that is used to scan hidden services in the dark web looking for potential data leaks. Recently the project released some cool visualizations and a high level description of what their scanning results looked like. What they didn't provide is how to actually go about scanning as much of the dark web as possible, and then how to produce those very cool visualizations that they show.

At a high level we need to do the following:

  1. Setup a server somewhere to host our scanner 24/7 because it takes some time to do the scanning work.
  2. Get TOR running on the server.
  3. Get OnionScan setup.
  4. Write some Python to handle the scanning and some of the other data management to deal with the scan results.
  5. Write some more Python to make some cool graphs. (Part Two of the series)

Let's get started!

Setting up a Digital Ocean Droplet

If you already use Amazon, or have your own Linux server somewhere you can skip this step. For the rest of you, you can use my referral link here to get a $10 credit with Digital Ocean that will get you a couple months free (full disclosure I make money in my Digital Ocean account if you start paying for your server, feel free to bypass that referral link and pay for your own server). I am assuming you are running Ubuntu 16.04 for the rest of the instructions.

  1. The first thing you need to do is to create a new Droplet by clicking on the big Create Droplet button.
  2. Next select a Ubuntu 16.04 configuration, and select the $5.00/month option (unless you want something more powerful).
  3. You can pick a datacenter wherever you like, and then scroll to the bottom and click Create.

It will begin creating your droplet, and soon you should receive an email with how to access your new Linux server. If you are on Mac OSX or Linux get your terminal open. If you are on Windows then grab Putty from here.

Now you are going to SSH into your new server. Windows Putty users just punch the IP address in that you received in your email and hit Enter. You will be authenticating as the root user and then type in the password you were provided in your email.

For Mac OSX and Linux people you will type the following into your terminal:

ssh root@IPADDRESS

You will be forced enter your password a second time, and then you have to change your password. Once that is done you should now be logged into your server.

Installing Prerequisites

Now we need to install the prerequisites for our upcoming code and for OnionScan. Follow each of these steps carefully and the instructions are the same for Mac OSX, Linux or Windows because the commands are all being run on the server.

Feel free to copy and paste each command instead of typing it out. Hit Enter on your keyboard after each step and watch for any problems or errors.

screen
apt-get update
apt-get install tor git bison libexif-dev
apt-get install python-pip
pip install stem

Now we need to install the Go requirements (OnionScan is written in Go). The following instructions are from Ryan Frankel's post here.

bash < <(curl -s -S -L https://raw.githubusercontent.com/moovweb/gvm/master/binscripts/gvm-installer)
[[ -s "$HOME/.gvm/scripts/gvm" ]] && source "$HOME/.gvm/scripts/gvm"
source /root/.gvm/scripts/gvm
gvm install go1.4 --binary
gvm use go1.4

Ok beauty we have Go installed. Now let's get OnionScan setup by entering the following:

go get github.com/s-rah/onionscan
go install github.com/s-rah/onionscan

Now if you just type:

onionscan

And hit Enter you should get the onionscan command line usage information. If this all worked then you have successfully installed OnionScan. If you for some reason close your terminal and you can't run the onionscan binary anymore just simply do a:

gvm use go1.4

and it will fix it for you.

Now we need to make a small modification to the TOR configuration to allow our Python script to request a new identity (a new IP address) which we will use when we run into scanning trouble later on. We have to enable this by doing the following:

tor --hash-password PythonRocks

This will give you output that will include the bottom line that looks like this:

16:3E73307B3E434914604C25C498FBE5F9B3A3AE2FB97DAF70616591AAF8

Copy this line and then type:

nano -w /etc/tor/torrc

This will open a simple text editor. Now go to the bottom of the file by hitting the following keystrokes (or endlessly scrolling down):

CTRL+W CTRL+V

Paste in the following values at the bottom of the file:

ControlPort 9051
ControlListenAddress 127.0.0.1
HashedControlPassword 16:3E73307B3E434914604C25C498FBE5F9B3A3AE2FB97DAF70616591AAF8

Now hit CTRL+O to write the file and CTRL+X to exit the file editor. Now type:

service tor restart

This will restart TOR and it should have our new settings in place. Note that if you want to use a password other than PythonRocks you will have to follow the steps above substituting your own password in place, and you will also have to later change the associated Python code.

We are almost ready to start writing some code. The last step is to grab my list of .onion addresses (at last count around 7182 addresses) so that your script has a starting point to start scanning hidden services.

wget https://raw.githubusercontent.com/automatingosint/osint_public/master/onionrunner/onion_master_list.txt

Whew! We are all setup and ready to start punching out some code. At this point you can switch to your local machine or if you are comfortable writing code on a Linux server by all means go for it. I find it easier to use WingIDE on my local machine personally.

A Note About Screen

You notice that both sets of instructions I have you run the screen command. This is a handy way to keep your session alive even if you get disconnected from your server. When you want to jump back into that session, you simply SSH back into the server and execute:

screen -rx

This will be handy later on when you start doing your scanning work, as it can take days for it to complete fully.

Writing an OnionScan Wrapper

OnionScan is a great tool but we need to be able to systematically control it, and process the results. As well, TOR connections are notoriously unstable so we need a way to kill a stuck scan process and grab a fresh IP address from the TOR network. Let's get coding! Crack open a new Python file, name it onionrunner.py and start punching out the following (you can download the full code here).

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):
        
        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:
                
                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")
                
                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)
                
                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())
                
                print "[!!!] Switched TOR identities."
        
        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle new onions.
#
def add_new_onions(new_onion_list):
        
        global onions
        global session_onions
        
        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)
        
        return

#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

Now we have to build some helper functions that will deal with loading our master list of onions and to be able to continue adding newly discovered onions to this list:

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

Now we will implement the function that deals with running the onionscan binary to do the actual scanning work. Keep adding code in your editor:

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

So there you have a neat trick to deal with some timing issues when running command line binaries. Now let's implement the actual timeout handling function to deal will killing the OnionScan and requesting a new IP from the Tor network. Keep on adding code:

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

Now we need to implement the function that will handle processing the JSON results that OnionScan hands back to us. March on good Python soldier:

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

Let's implement that function now.

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

Now let's start putting the finishing touches on this script.

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

        count += 1

Now it's time to put the main loop in place that will be responsible for kickstarting OnionScan for each hidden service that we have stored.

from stem.control import Controller
from stem import Signal
from threading import Timer
from threading import Event

import codecs
import json
import os
import random
import subprocess
import sys
import time

onions         = []
session_onions = []

identity_lock  = Event()
identity_lock.set()

#
# Grab the list of onions from our master list file.
#
def get_onion_list():
        
        # open the master list
        if os.path.exists("onion_master_list.txt"):
        
                with open("onion_master_list.txt","rb") as fd:

                        stored_onions = fd.read().splitlines()  
        else:
                print "[!] No onion master list. Download it!"
                sys.exit(0)
        
        print "[*] Total onions for scanning: %d" % len(stored_onions)

        return stored_onions
#
# Stores an onion in the master list of onions.
#
def store_onion(onion):
        
        print "[++] Storing %s in master list." % onion
        
        with codecs.open("onion_master_list.txt","ab",encoding="utf8") as fd:
                fd.write("%s\n" % onion)

        return
        
#
# Runs onion scan as a child process.
#               
def run_onionscan(onion):
        
        print "[*] Onionscanning %s" % onion
        
        # fire up onionscan
        process = subprocess.Popen(["onionscan","--jsonReport","--simpleReport=false",onion],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        
        # start the timer and let it run 5 minutes
        process_timer = Timer(300,handle_timeout,args=[process,onion])
        process_timer.start()

        # wait for the onion scan results
        stdout = process.communicate()[0]

        # we have received valid results so we can kill the timer 
        if process_timer.is_alive():
                process_timer.cancel()
                return stdout

        print "[!!!] Process timed out!"        

        return None

#
# Handle a timeout from the onionscan process.
#
def handle_timeout(process,onion):

        global session_onions
        global identity_lock 

        # halt the main thread while we grab a new identity
        identity_lock.clear()

        # kill the onionscan process
        try:
                process.kill()
                print "[!!!] Killed the onionscan process."
        except:
                pass

        # Now we switch TOR identities to make sure we have a good connection
        with Controller.from_port(port=9051) as torcontrol:

                # authenticate to our local TOR controller
                torcontrol.authenticate("PythonRocks")

                # send the signal for a new identity
                torcontrol.signal(Signal.NEWNYM)

                # wait for the new identity to be initialized
                time.sleep(torcontrol.get_newnym_wait())

                print "[!!!] Switched TOR identities."

        # push the onion back on to the list    
        session_onions.append(onion)
        random.shuffle(session_onions)

        # allow the main thread to resume executing
        identity_lock.set()     

        return


#
# Processes the JSON result from onionscan.
#
def process_results(onion,json_response):
        global onions
        global session_onions

        # create our output folder if necessary
        if not os.path.exists("onionscan_results"):
                os.mkdir("onionscan_results")

        # write out the JSON results of the scan
        with open("%s/%s.json" % ("onionscan_results",onion), "wb") as fd:
                fd.write(json_response)

        # look for additional .onion domains to add to our scan list
        scan_result = ur"%s" % json_response.decode("utf8")
        scan_result = json.loads(scan_result)
        
        if scan_result['linkedSites'] is not None:
                add_new_onions(scan_result['linkedSites'])              
                
        if scan_result['relatedOnionDomains'] is not None:
                add_new_onions(scan_result['relatedOnionDomains'])
                
        if scan_result['relatedOnionServices'] is not None:
                add_new_onions(scan_result['relatedOnionServices'])
                

        return

#
# Handle new onions.
#
def add_new_onions(new_onion_list):

        global onions
        global session_onions

        for linked_onion in new_onion_list:

                if linked_onion not in onions and linked_onion.endswith(".onion"):

                        print "[++] Discovered new .onion => %s" % linked_onion

                        onions.append(linked_onion)
                        session_onions.append(linked_onion)
                        random.shuffle(session_onions)
                        store_onion(linked_onion)

        return

# get a list of onions to process
onions = get_onion_list()

# randomize the list a bit
random.shuffle(onions)
session_onions = list(onions)

count = 0

while count < len(onions):

        # if the event is cleared we will halt here
        # otherwise we continue executing
        identity_lock.wait()

        # grab a new onion to scan
        print "[*] Running %d of %d." % (count,len(onions))
        onion  = session_onions.pop()
        
        # test to see if we have already retrieved results for this onion
        if os.path.exists("onionscan_results/%s.json" % onion):

                print "[!] Already retrieved %s. Skipping." % onion
                count += 1

                continue

        # run the onion scan    
        result = run_onionscan(onion)

        # process the results
        if result is not None:
                
                if len(result):
                        process_results(onion,result)           

                        count += 1

Whew! That is a lot of code, but hopefully you have learned a few new Python coding tricks along the way, and it might give you ideas on how you can wrap other scanning software in a similar way as we did with OnionScan. Now for the moment of truth…

Let it Rip!

Now you are ready to start scanning! Simple run:

python onionrunner.py

And you should start seeing output like the following:


# python onionrunner.py
[*] Total onions for scanning: 7182
[*] Running 0 of 7182.
[*] Onionscanning nfokjthabqzfndmj.onion
[*] Running 1 of 7182.
[*] Onionscanning gmts3xxfrbfxdm3a.onion


If you check the onionscan_results directory you should see a JSON files that are named by the hidden service that was scanned. Let this puppy run as long as you can tolerate, in the second post we are going to process these JSON files and begin to create some visualizations. For bonus points you can also push those JSON files into Elasticsearch (or modify onionrunner.py to do so on the fly) and analyze the results using Kibana!

If you don't want to wait to get all of the data yourself, you can download the scan results for 8,167 onions from here.

28 Jul 2016 1:39pm GMT

10 Nov 2011

feedPython Software Foundation | GSoC'11 Students

Benedict Stein: King Willams Town Bahnhof

Gestern musste ich morgens zur Station nach KWT um unsere Rerservierten Bustickets für die Weihnachtsferien in Capetown abzuholen. Der Bahnhof selber ist seit Dezember aus kostengründen ohne Zugverbindung - aber Translux und co - die langdistanzbusse haben dort ihre Büros.


Größere Kartenansicht




© benste CC NC SA

10 Nov 2011 10:57am GMT

09 Nov 2011

feedPython Software Foundation | GSoC'11 Students

Benedict Stein

Niemand ist besorgt um so was - mit dem Auto fährt man einfach durch, und in der City - nahe Gnobie- "ne das ist erst gefährlich wenn die Feuerwehr da ist" - 30min später auf dem Rückweg war die Feuerwehr da.




© benste CC NC SA

09 Nov 2011 8:25pm GMT

08 Nov 2011

feedPython Software Foundation | GSoC'11 Students

Benedict Stein: Brai Party

Brai = Grillabend o.ä.

Die möchte gern Techniker beim Flicken ihrer SpeakOn / Klinke Stecker Verzweigungen...

Die Damen "Mamas" der Siedlung bei der offiziellen Eröffnungsrede

Auch wenn weniger Leute da waren als erwartet, Laute Musik und viele Leute ...

Und natürlich ein Feuer mit echtem Holz zum Grillen.

© benste CC NC SA

08 Nov 2011 2:30pm GMT

07 Nov 2011

feedPython Software Foundation | GSoC'11 Students

Benedict Stein: Lumanyano Primary

One of our missions was bringing Katja's Linux Server back to her room. While doing that we saw her new decoration.

Björn, Simphiwe carried the PC to Katja's school


© benste CC NC SA

07 Nov 2011 2:00pm GMT

06 Nov 2011

feedPython Software Foundation | GSoC'11 Students

Benedict Stein: Nelisa Haircut

Today I went with Björn to Needs Camp to Visit Katja's guest family for a special Party. First of all we visited some friends of Nelisa - yeah the one I'm working with in Quigney - Katja's guest fathers sister - who did her a haircut.

African Women usually get their hair done by arranging extensions and not like Europeans just cutting some hair.

In between she looked like this...

And then she was done - looks amazing considering the amount of hair she had last week - doesn't it ?

© benste CC NC SA

06 Nov 2011 7:45pm GMT

05 Nov 2011

feedPython Software Foundation | GSoC'11 Students

Benedict Stein: Mein Samstag

Irgendwie viel mir heute auf das ich meine Blogposts mal ein bischen umstrukturieren muss - wenn ich immer nur von neuen Plätzen berichte, dann müsste ich ja eine Rundreise machen. Hier also mal ein paar Sachen aus meinem heutigen Alltag.

Erst einmal vorweg, Samstag zählt zumindest für uns Voluntäre zu den freien Tagen.

Dieses Wochenende sind nur Rommel und ich auf der Farm - Katja und Björn sind ja mittlerweile in ihren Einsatzstellen, und meine Mitbewohner Kyle und Jonathan sind zu Hause in Grahamstown - sowie auch Sipho der in Dimbaza wohnt.
Robin, die Frau von Rommel ist in Woodie Cape - schon seit Donnerstag um da ein paar Sachen zur erledigen.
Naja wie dem auch sei heute morgen haben wir uns erstmal ein gemeinsames Weetbix/Müsli Frühstück gegönnt und haben uns dann auf den Weg nach East London gemacht. 2 Sachen waren auf der Checkliste Vodacom, Ethienne (Imobilienmakler) außerdem auf dem Rückweg die fehlenden Dinge nach NeedsCamp bringen.

Nachdem wir gerade auf der Dirtroad losgefahren sind mussten wir feststellen das wir die Sachen für Needscamp und Ethienne nicht eingepackt hatten aber die Pumpe für die Wasserversorgung im Auto hatten.

Also sind wir in EastLondon ersteinmal nach Farmerama - nein nicht das onlinespiel farmville - sondern einen Laden mit ganz vielen Sachen für eine Farm - in Berea einem nördlichen Stadteil gefahren.

In Farmerama haben wir uns dann beraten lassen für einen Schnellverschluss der uns das leben mit der Pumpe leichter machen soll und außerdem eine leichtere Pumpe zur Reperatur gebracht, damit es nicht immer so ein großer Aufwand ist, wenn mal wieder das Wasser ausgegangen ist.

Fego Caffé ist in der Hemmingways Mall, dort mussten wir und PIN und PUK einer unserer Datensimcards geben lassen, da bei der PIN Abfrage leider ein zahlendreher unterlaufen ist. Naja auf jeden Fall speichern die Shops in Südafrika so sensible Daten wie eine PUK - die im Prinzip zugang zu einem gesperrten Phone verschafft.

Im Cafe hat Rommel dann ein paar online Transaktionen mit dem 3G Modem durchgeführt, welches ja jetzt wieder funktionierte - und übrigens mittlerweile in Ubuntu meinem Linuxsystem perfekt klappt.

Nebenbei bin ich nach 8ta gegangen um dort etwas über deren neue Deals zu erfahren, da wir in einigen von Hilltops Centern Internet anbieten wollen. Das Bild zeigt die Abdeckung UMTS in NeedsCamp Katjas Ort. 8ta ist ein neuer Telefonanbieter von Telkom, nachdem Vodafone sich Telkoms anteile an Vodacom gekauft hat müssen die komplett neu aufbauen.
Wir haben uns dazu entschieden mal eine kostenlose Prepaidkarte zu testen zu organisieren, denn wer weis wie genau die Karte oben ist ... Bevor man einen noch so billigen Deal für 24 Monate signed sollte man wissen obs geht.

Danach gings nach Checkers in Vincent, gesucht wurden zwei Hotplates für WoodyCape - R 129.00 eine - also ca. 12€ für eine zweigeteilte Kochplatte.
Wie man sieht im Hintergrund gibts schon Weihnachtsdeko - Anfang November und das in Südafrika bei sonnig warmen min- 25°C

Mittagessen haben wir uns bei einem Pakistanischen Curry Imbiss gegönnt - sehr empfehlenswert !
Naja und nachdem wir dann vor ner Stunde oder so zurück gekommen sind habe ich noch den Kühlschrank geputzt den ich heute morgen zum defrosten einfach nach draußen gestellt hatte. Jetzt ist der auch mal wieder sauber und ohne 3m dicke Eisschicht...

Morgen ... ja darüber werde ich gesondert berichten ... aber vermutlich erst am Montag, denn dann bin ich nochmal wieder in Quigney(East London) und habe kostenloses Internet.

© benste CC NC SA

05 Nov 2011 4:33pm GMT

31 Oct 2011

feedPython Software Foundation | GSoC'11 Students

Benedict Stein: Sterkspruit Computer Center

Sterkspruit is one of Hilltops Computer Centres in the far north of Eastern Cape. On the trip to J'burg we've used the opportunity to take a look at the centre.

Pupils in the big classroom


The Trainer


School in Countryside


Adult Class in the Afternoon


"Town"


© benste CC NC SA

31 Oct 2011 4:58pm GMT

Benedict Stein: Technical Issues

What are you doing in an internet cafe if your ADSL and Faxline has been discontinued before months end. Well my idea was sitting outside and eating some ice cream.
At least it's sunny and not as rainy as on the weekend.


© benste CC NC SA

31 Oct 2011 3:11pm GMT

30 Oct 2011

feedPython Software Foundation | GSoC'11 Students

Benedict Stein: Nellis Restaurant

For those who are traveling through Zastron - there is a very nice Restaurant which is serving delicious food at reasanable prices.
In addition they're selling home made juices jams and honey.




interior


home made specialities - the shop in the shop


the Bar


© benste CC NC SA

30 Oct 2011 4:47pm GMT

29 Oct 2011

feedPython Software Foundation | GSoC'11 Students

Benedict Stein: The way back from J'burg

Having the 10 - 12h trip from J'burg back to ELS I was able to take a lot of pcitures including these different roadsides

Plain Street


Orange River in its beginngings (near Lesotho)


Zastron Anglican Church


The Bridge in Between "Free State" and Eastern Cape next to Zastron


my new Background ;)


If you listen to GoogleMaps you'll end up traveling 50km of gravel road - as it was just renewed we didn't have that many problems and saved 1h compared to going the official way with all it's constructions sites




Freeway


getting dark


© benste CC NC SA

29 Oct 2011 4:23pm GMT

28 Oct 2011

feedPython Software Foundation | GSoC'11 Students

Benedict Stein: Wie funktioniert eigentlich eine Baustelle ?

Klar einiges mag anders sein, vieles aber gleich - aber ein in Deutschland täglich übliches Bild einer Straßenbaustelle - wie läuft das eigentlich in Südafrika ?

Ersteinmal vorweg - NEIN keine Ureinwohner die mit den Händen graben - auch wenn hier mehr Manpower genutzt wird - sind sie fleißig mit Technologie am arbeiten.

Eine ganz normale "Bundesstraße"


und wie sie erweitert wird


gaaaanz viele LKWs


denn hier wird eine Seite über einen langen Abschnitt komplett gesperrt, so das eine Ampelschaltung mit hier 45 Minuten Wartezeit entsteht


Aber wenigstens scheinen die ihren Spaß zu haben ;) - Wie auch wir denn gücklicher Weise mussten wir nie länger als 10 min. warten.

© benste CC NC SA

28 Oct 2011 4:20pm GMT