2. API Reference

class python_workflow.Task(name=None, *args, **kwargs)
Parameters:
  • name (str) – The name of the task

  • start_at (float | None) – time at the task is started

  • stop_at (float | None) – time at the task is completed

  • value – the value returns from the run()

  • _args (list, optional) – not used. It will be available from run()

  • _kwargs (dict, optional) – not used. It will be available from run()

__init__(name=None, *args, **kwargs)

Create a threadable Task

Parameters:

name – The name of the Task, useful for logging.

property duration

Default : None

Return the duration of the Task when its completed.

Return type:

float or None

is_completed()

Default : False

Returns True when the Task is completed without any errors.

Return type:

bool

on_complete()
This method is called after calling run(). By default, it writes logs.
Mostly, I override it to send notifications to Slack or somewhere else.
on_error(*args, **kwargs)

Raise the first args.

on_start()
This method is called before calling run(). By default, it writes logs.
Mostly, I override it to send notifications to Slack or somewhere else.
reset()

Restart timers to None. duration become 0

run()
This method must be overridden.
Its content will be executed in a thread and the return will be caught by the Step.
start()

Starts the task, initializes timers and writes logs. Catch run(). exceptions and throw them.

Returns:

the value returned from run()

Exception:

Exception

stop()

Stop the timers and return its value

Return type:

float

class python_workflow.Step(name=None, tasks=None, *args, **kwargs)

Bases: Task

Parameters:
  • tasks (Task[]) –

  • nb_thread (float) –

  • raise_error (bool) –

__init__(name=None, tasks=None, *args, **kwargs)
Parameters:
  • name (str) – the name of the Step

  • tasks (Task[]) –

  • args (list, optional) – not used. It will be available from run()

  • kwargs (dict, optional) – not used. It will be available from run()

  • nb_thread (float, default 4) – Number of threads running at the same time.

  • raise_error (bool, default True) – Step should stop if one of task failed.

reset()

Reset its timers > its Task timers

run()
The return will be caught by the Workflow.
Returns:

Values of Task[]

class python_workflow.Workflow(name=None, steps=None, *args, **kwargs)

Bases: Task

Parameters:

steps (Step[]) –

__init__(name=None, steps=None, *args, **kwargs)
Parameters:
  • name (str) – the name of the Workflow

  • steps (Step[]) –

  • args (list, optional) – not used. It will be available from run()

  • kwargs (dict, optional) – not used. It will be available from run()

reset()

Reset its timers > its Step timers > its Task timers

run()
Returns:

Values of Step[]