1. Tutorial
This tutorial introduces python-workflow by means of example - we will walk through how to create a simple crawler for images. Our crawler will fetch images from https://picsum.photos/ and resize them.
1.1. Getting started
Before we start, we have to install python-workflow.
$ python -m pip install python-workflow
Warning
Only for this tutorial, we will install Pillow and Requests
$ python -m pip install pillow requests
We have to create a main.py
file and import python-workflow.
Note
You can see our examples on Github
1.2. Defining our tasks
First we will create 2 types of Task
: CrawlerTask
and ResizerTask
# main.py
import gevent
import requests
from PIL import Image
from python_workflow import Task, Step, Workflow
class CrawlerTask(Task):
def __init__(self, index=0):
super().__init__('MyCrawlerTask')
self.index = index
def run(self):
r = requests.get(
'https://picsum.photos/600/400'
)
if r.status_code == 200:
with open('output/%s-image.png' % self.index, "wb") as f:
f.write(r.content)
class ResizerTask(Task):
def __init__(self, index=0):
super().__init__('MyResizerTask')
self.index = index
def run(self):
filename = 'output/%s-image.png' % self.index
final_filename = 'output/%s-thumbnail.png' % self.index
base_width = 100
img = Image.open(filename)
w_percent = (base_width / float(img.size[0]))
hsize = int((float(img.size[1]) * float(w_percent)))
img = img.resize((base_width, hsize), Image.LANCZOS)
img.save(final_filename)
# We are slowing down the resizing process for watching threads
gevent.sleep(1)
1.3. Defining our steps
Now we have our tasks, we will group them as Step. All tasks in a Step are threaded.
By default, tasks are processed in stack of 4. You can change it with nb_thread kwargs. See
__init__()
# main.py
#
# ....
#
tasks = []
for i in range(0, 10):
tasks.append(
CrawlerTask(i)
)
step1 = Step(
'CrawlingStep',
tasks=tasks,
)
tasks = []
for i in range(0, 10):
tasks.append(
ResizerTask(i)
)
step2 = Step(
'ResizingStep',
tasks=tasks
)
1.4. Defining our workflow
Finally, define Step
in our Workflow with __init__()
.
# main.py
#
# ....
#
w = Workflow(
'ExampleCrawlerWorkflow',
steps=[
step1,
step2
]
)
w.start()
1.5. Running our workflow !
Execute main.py
$ python main.py
[2024-02-22T16:15:16.246620][ExampleCrawlerWorkflow ] Workflow is starting ... args=(), kwargs={}
[2024-02-22T16:15:16.246708][CrawlingStep ] Step is starting ... args=(), kwargs={}
[2024-02-22T16:15:16.247516][MyCrawlerTask ] CrawlerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:16.248300][MyCrawlerTask ] CrawlerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:16.248688][MyCrawlerTask ] CrawlerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:16.248991][MyCrawlerTask ] CrawlerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:18.011087][ MyCrawlerTask] CrawlerTask completed (1.7636s.) args=(), kwargs={}
[2024-02-22T16:15:18.011287][ MyCrawlerTask] CrawlerTask completed (1.763s.) args=(), kwargs={}
[2024-02-22T16:15:18.011369][ MyCrawlerTask] CrawlerTask completed (1.7627s.) args=(), kwargs={}
[2024-02-22T16:15:18.011433][ MyCrawlerTask] CrawlerTask completed (1.7625s.) args=(), kwargs={}
[2024-02-22T16:15:18.011621][MyCrawlerTask ] CrawlerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:18.012474][MyCrawlerTask ] CrawlerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:18.013083][MyCrawlerTask ] CrawlerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:18.013424][MyCrawlerTask ] CrawlerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:20.132523][ MyCrawlerTask] CrawlerTask completed (2.1209s.) args=(), kwargs={}
[2024-02-22T16:15:20.132687][ MyCrawlerTask] CrawlerTask completed (2.1203s.) args=(), kwargs={}
[2024-02-22T16:15:20.132769][ MyCrawlerTask] CrawlerTask completed (2.1197s.) args=(), kwargs={}
[2024-02-22T16:15:20.132841][ MyCrawlerTask] CrawlerTask completed (2.1195s.) args=(), kwargs={}
[2024-02-22T16:15:20.133016][MyCrawlerTask ] CrawlerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:20.133872][MyCrawlerTask ] CrawlerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:20.886846][ MyCrawlerTask] CrawlerTask completed (0.7539s.) args=(), kwargs={}
[2024-02-22T16:15:20.887034][ MyCrawlerTask] CrawlerTask completed (0.7532s.) args=(), kwargs={}
[2024-02-22T16:15:20.887185][ CrawlingStep] Step completed (4.6405s.) args=(), kwargs={}
[2024-02-22T16:15:20.887246][ResizingStep ] Step is starting ... args=(), kwargs={}
[2024-02-22T16:15:20.887408][MyResizerTask ] ResizerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:20.887528][MyResizerTask ] ResizerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:20.887592][MyResizerTask ] ResizerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:20.887651][MyResizerTask ] ResizerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:21.901267][ MyResizerTask] ResizerTask completed (1.0139s.) args=(), kwargs={}
[2024-02-22T16:15:21.906071][ MyResizerTask] ResizerTask completed (1.0186s.) args=(), kwargs={}
[2024-02-22T16:15:21.910313][ MyResizerTask] ResizerTask completed (1.0228s.) args=(), kwargs={}
[2024-02-22T16:15:21.914607][ MyResizerTask] ResizerTask completed (1.027s.) args=(), kwargs={}
[2024-02-22T16:15:21.914725][MyResizerTask ] ResizerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:21.914773][MyResizerTask ] ResizerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:21.914801][MyResizerTask ] ResizerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:21.914828][MyResizerTask ] ResizerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:22.924639][ MyResizerTask] ResizerTask completed (1.01s.) args=(), kwargs={}
[2024-02-22T16:15:22.932236][ MyResizerTask] ResizerTask completed (1.0175s.) args=(), kwargs={}
[2024-02-22T16:15:22.937866][ MyResizerTask] ResizerTask completed (1.0231s.) args=(), kwargs={}
[2024-02-22T16:15:22.942050][ MyResizerTask] ResizerTask completed (1.0273s.) args=(), kwargs={}
[2024-02-22T16:15:22.942173][MyResizerTask ] ResizerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:22.942226][MyResizerTask ] ResizerTask is starting ... args=(), kwargs={}
[2024-02-22T16:15:23.952175][ MyResizerTask] ResizerTask completed (1.01s.) args=(), kwargs={}
[2024-02-22T16:15:23.956687][ MyResizerTask] ResizerTask completed (1.0145s.) args=(), kwargs={}
[2024-02-22T16:15:23.956783][ ResizingStep] Step completed (3.0696s.) args=(), kwargs={}
[2024-02-22T16:15:23.956816][ ExampleCrawlerWorkflow] Workflow completed (7.7102s.) args=(), kwargs={}