The asynciojobs API

The Scheduler classes

The PureScheduler class is a set of AbstractJobs, that together with their required relationship, form an execution graph.

class asynciojobs.purescheduler.PureScheduler(*jobs_or_sequences, jobs_window=None, timeout=None, shutdown_timeout=1, watch=None, verbose=False)

A PureScheduler instance is made of a set of AbstractJob objects.

The purpose of the scheduler object is to orchestrate an execution of these jobs that respects the required relationships, until they are all complete. It starts with the ones that have no requirement, and then triggers the other ones as their requirement jobs complete.

For this reason, the dependency/requirements graph must be acyclic.

Optionnally a scheduler orchestration can be confined to a finite number of concurrent jobs (see the jobs_window parameter below).

It is also possible to define a timeout attribute on the object, that will limit the execution time of a scheduler.

Running an AbstractJob means executing its co_run() method, which must be a coroutine

The result of a job’s co_run() is NOT taken into account, as long as it returns without raising an exception. If it does raise an exception, overall execution is aborted iff the job is critical. In all cases, the result and/or exception of each individual job can be inspected and retrieved individually at any time, including of course once the orchestration is complete.

Parameters:
  • jobs_or_sequences – instances of AbstractJob or Sequence. The order in which they are mentioned is irrelevant.
  • jobs_window – is an integer that specifies how many jobs can be run simultaneously. None or 0 means no limit.
  • timeout – can be an int or float and is expressed in seconds; it applies to the overall orchestration of that scheduler, not to any individual job. Can be also None, which means no timeout.
  • shutdown_timeout – same meaning as timeout, but for the shutdown phase.
  • watch – if the caller passes a Watch instance, it is used in debugging messages to show the time elapsed wrt that watch, instead of using the wall clock.
  • verbose (bool) – flag that says if execution should be verbose.

Examples

Creating an empty scheduler:

s = Scheduler()

A scheduler with a single job:

s = Scheduler(Job(asyncio.sleep(1)))

A scheduler with 2 jobs in parallel:

s = Scheduler(Job(asyncio.sleep(1)),
              Job(asyncio.sleep(2)))

A scheduler with 2 jobs in sequence:

s = Scheduler(
        Sequence(
            Job(asyncio.sleep(1)),
            Job(asyncio.sleep(2))
        ))

In this document, the Schedulable name refers to a type hint, that encompasses instances of either the AbstractJob or Sequence classes.

add(job)

Adds a single Schedulable object; this method name is inspired from plain python set.add()

Parameters:job – a single Schedulable object.
Returns:the scheduler object, for cascading insertions if needed.
Return type:self
check_cycles()

Performs a minimal sanity check. The purpose of this is primarily to check for cycles, and/or missing starting points.

It’s not embedded in co_run() because it is not strictly necessary, but it is safer to call this before running the scheduler if one wants to double-check the jobs dependency graph early on.

It might also help to have a sanitized scheduler, but here again this is up to the caller.

Returns:True if the topology is fine
Return type:bool
coroutine co_run()

The primary entry point for running a scheduler. See also run() for a synchronous wrapper around this coroutine.

Runs member jobs (that is, schedule their co_run() method) in an order that satisfies their required relationsship.

Proceeds to the end no matter what, except if either:

  • one critical job raises an exception, or
  • a timeout occurs.
Returns:True if none of these 2 conditions occur, False otherwise.
Return type:bool

Jobs marked as forever are not waited for.

No automatic shutdown is performed, user needs to explicitly call co_shutdown() or shutdown().

coroutine co_shutdown()

Shut down the scheduler, by sending the co_shutdown() method to all the jobs, possibly nested.

Within nested schedulers, a job receives the shutdown event when its enclosing scheduler terminates, and not at the end of the outermost scheduler.

Also note that all job instances receive the ‘co_shutdown()’ method, even the ones that have not yet started; it is up to the co_shutdown() method to triage the jobs according to their life cycle status - see is_running() and similar.

This mechanism should be used only for minimal housekeeping only, it is recommended that intrusive cleanup be made part of separate, explicit methods.

Note:typically in apssh for example, several jobs sharing the same ssh

connection need to arrange for that connection to *be kept alive across an entire scheduler lifespan, and closed later on. Historically there had been an attempt to deal with this automagically, through the present shutdown mechanism. However, this turned out to be the wrong choice, as the choice of closing connections needs to be left to the user. Additionally, with nested schedulers, this can become pretty awkward. Closing ssh connections is now to be achieved explicitly through a call to a specific apssh function.

Returns:
True if all the
co_shutdown() methods attached to the jobs in the scheduler complete within shutdown_timeout, which is an attribute of the scheduler. If the shutdown_timeout attribute on this object is None, no timeout is implemented.
Return type:bool

Notes

There is probably space for a lot of improvement here xxx:

  • behaviour is unspecified if any of the co_shutdown() methods raises an exception;
  • right now, a subscheduler that sees a timeout expiration does not cause the overall co_shutdown() to return False, which is arguable;
  • another possible weakness in current implementation is that it does not support to shutdown a scheduler that is still running.
debrief(details=False)

Designed for schedulers that have failed to orchestrate.

Print a complete report, that includes list() but also gives more stats and data.

dot_format()

Creates a graph that depicts the jobs and their requires relationships, in DOT Format.

Returns:
str: a representation of the graph in DOT Format underlying this scheduler.

See graphviz’s documentation, together with its Python wrapper library, for more information on the format and available tools.

See also Wikipedia on DOT for a list of tools that support the dot format.

As a general rule, asynciojobs has a support for producing DOT Format but stops short of actually importing graphviz that can be cumbersome to install, but for the notable exception of the :meth:graph() method. See that method for how to convert a PureScheduler instance into a native DiGraph instance.

DOT_%28graph_description_language%29

entry_jobs()

A generator that yields all jobs that have no requirement.

Exemples:

List all entry points:

for job in scheduler.entry_points():
    print(job)
exit_jobs(*, discard_forever=True, compute_backlinks=True)

A generator that yields all jobs that are not a requirement to another job; it is thus in some sense the reverse of entry_points().

Parameters:
  • discard_forever – if True, jobs marked as forever are skipped; forever jobs often have no successors, but are seldom of interest when calling this method.
  • compute_backlinks – for this method to work properly, it is necessary to compute backlinks, an internal structure that holds the opposite of the required relationship. Passing False here allows to skip that stage, when that relationship is known to be up to date already.
export_as_dotfile(filename)

This method does not require graphviz to be installed, it writes a file in dot format for post-processing with e.g. graphviz’s dot utility. It is a simple wrapper around dot_format().

Parameters:filename – where to store the result.
Returns:a message that can be printed for information, like e.g. "(Over)wrote foo.dot"
Return type:str

See also the graph() method that serves a similar purpose but natively as a graphviz object.

As an example of post-processing, a PNG image can be then obtained from that dotfile with e.g.:

dot -Tpng foo.dot -o foo.png
export_as_pngfile(filename)

Convenience wrapper that creates a png file. Like graph(), it requires the graphviz package to be installed.

Parameters:filename – output filename, without the .png extension
Returns:created file name

Notes

  • This actually uses the binary dot program.
  • A file named as the output but with a .dot extension is created as an artefact by this method.
failed_critical()
Returns:returns True if and only if co_run() has failed because a critical job has raised an exception.
Return type:bool
failed_time_out()
Returns:returns True if and only if co_run() has failed because of a time out.
Return type:bool
graph()
Returns:a graph
Return type:graphviz.Digraph

This method serves the same purpose as export_to_dotfile(), but it natively returns a graph instance. For that reason, its usage requires the installation of the graphviz package.

This method is typically useful in a Jupyter notebook, so as to visualize a scheduler in graph format - see http://graphviz.readthedocs.io/en/stable/manual.html#jupyter-notebooks for how this works.

The dependency from asynciojobs to graphviz is limited to this method and export_as_pngfile(), as it these are the only places that need it, and as installing graphviz can be cumbersome.

For example, on MacOS I had to do both:

brew install graphviz     # for the C/C++ binary stuff
pip3 install graphviz     # for the python bindings
iterate_jobs(scan_schedulers=False)

A generator that scans all jobs and subjobs

Parameters:scan_schedulers – if set, nested schedulers are ignored, only actual jobs are reported; otherwise, nested schedulers are listed as well.
list(details=False)

Prints a complete list of jobs in topological order, with their status summarized with a few signs. See the README for examples and a legend.

Beware that this might raise an exception if check_cycles() would return False, i.e. if the graph is not acyclic.

list_safe()

Print jobs in no specific order, the advantage being that it works even if scheduler is broken wrt check_cycles(). On the other hand, this method is not able to list requirements.

orchestrate(*args, **kwds)

A synchroneous wrapper around co_run(), please refer to that link for details on parameters and return value.

Also, the canonical name for this is run() but for historical reasons you can also use orchestrate() as an alias for run().

remove(job)

Removes a single Schedulable object; this method name is inspired from plain python set.remove()

Parameters:job – a single Schedulable object.
Raises:KeyError – if job not in scheduler.
Returns:the scheduler object, for cascading insertions if needed.
Return type:self
run(*args, **kwds)

A synchroneous wrapper around co_run(), please refer to that link for details on parameters and return value.

Also, the canonical name for this is run() but for historical reasons you can also use orchestrate() as an alias for run().

sanitize(verbose=None)

This method ensures that the requirements relationship is closed within the scheduler. In other words, it removes any requirement attached to a job in this scheduler, but that is not itself part of the scheduler.

This can come in handy in some scheduler whose composition depends on external conditions.

In any case it is crucial that this property holds for co_run() to perform properly.

Parameters:verbose – if not None, defines verbosity for this operation. Otherwise, the object’s verbose attribute is used. In verbose mode, jobs that are changed, i.e. that have requirement(s) dropped because they are not part of the same scheduler, are listed, together with their container scheduler.
Returns:
returns True if scheduler object was fine,
and False if at least one removal was needed.
Return type:bool
shutdown()

A synchroneous wrapper around co_shutdown().

Returns:True if everything went well, False otherwise; see co_shutdown() for details.
Return type:bool
stats()

Returns a string like e.g. 2D + 3R + 4I = 9 meaning that the scheduler currently has 2 done, 3 running an 4 idle jobs

topological_order()

A generator function that scans the graph in topological order, in the same order as the orchestration, i.e. starting from jobs that have no dependencies, and moving forward.

Beware that this is not a separate iterator, so it can’t be nested, which in practice should not be a problem.

Examples

Assuming all jobs have a label attribute, print them in the “right” order:

for job in scheduler.topological_order():
    print(job.label)
update(jobs)

Adds a collection of Schedulable objects; this method is named after set.update().

Parameters:jobs – a collection of Schedulable objects.
Returns:the scheduler object, for cascading insertions if needed.
Return type:self
why()
Returns:a message explaining why co_run() has failed, or "FINE" if it has not failed.
Return type:str

Notes

At this point the code does not check that co_run() has actually been called.

The Scheduler class makes it easier to nest scheduler objects.

class asynciojobs.scheduler.Scheduler(*jobs_or_sequences, jobs_window=None, timeout=None, shutdown_timeout=1, watch=None, verbose=False, **kwds)

The Scheduler class is a mixin of the two PureScheduler and AbstractJob classes.

As such it can be used to create nested schedulers, since it is a scheduler that can contain jobs, and at the same time it is a job, and so it can be included in a scheduler.

Parameters:

Example

Here’s how to create a very simple scheduler with an embedded sub-scheduler; the whole result is equivalent to a simple 4-steps sequence:

main = Scheduler(
   Sequence(
     Job(aprint("begin", duration=0.25)),
     Scheduler(
       Sequence(
         Job(aprint("middle-begin", duration = 0.25)),
         Job(aprint("middle-end", duration = 0.25)),
       )
     ),
     Job(aprint("end", duration=0.25)),
   )
main.run()

Notes

There can be several good reasons for using nested schedulers:

  • the scope of a window object applies to a scheduler, so a nested scheduler is a means to apply windoing on a specific set of jobs;
  • likewise the timeout attribute only applies to the run for the whole scheduler;
  • you can use forever jobs that will be terminated earlier than the end of the global scheduler;
  • strictly speaking, the outermost instance in this example could be an instance of PureScheduler, but in practice it is simpler to always create instances of Scheduler.

Using an intermediate-level scheduler can in some case help alleviate or solve such issues.

check_cycles()

Supersedes check_cycles() to account for nested schedulers.

Returns:
True if this scheduler, and all its nested schedulers
at any depth, has no cycle and can be safely scheduled.
Return type:bool
coroutine co_run()

Supersedes the co_run() method in order to account for critical schedulers.

Scheduler being a subclass of AbstractJob, we need to account for the possibility that a scheduler is defined as critical.

If the inherited co_run() method fails because of an exception of a timeout, a critical Scheduler will trigger an exception, instead of returning False:

  • if orchestration failed because an internal job has raised an exception, raise that exception;
  • if it failed because of a timeout, raise TimeoutError
Returns:

True if everything went well;

False for non-critical schedulers that go south.

Return type:

bool

Raises:
  • TimeoutError – for critical schedulers that do not complete in time,
  • Exception – for a critical scheduler that has a critical job that triggers an exception, in which case it bubbles up.
dot_cluster_name()

assigns a name to the subgraph that will represent a nested scheduler; dot format imposes this name to start with cluster_


Job-like classes

This module defines AbstractJob, that is the base class for all the jobs in a Scheduler, as well as a basic concrete subclass Job for creating a job from a coroutine.

It also defines a couple of simple job classes.

class asynciojobs.job.AbstractJob(*, forever=False, critical=True, label=None, required=None, scheduler=None)

AbstractJob is a virtual class:

  • it offers some very basic graph-related features to model requirements a la Makefile;
  • its subclasses are expected to implement a co_run() and a co_shutdown() methods that specify the actual behaviour of the job, as coroutines.
  • AbstractJob is mostly a companion class to the PureScheduler class, that triggers these co_* methods.

Life Cycle: AbstractJob is also aware of a common life cycle for all jobs, which can be summarized as follows:

idlescheduledrunningdone

In un-windowed schedulers, there is no distinction between scheduled and running. In other words, in this case a job goes directly from idle to running.a

On the other hand, in windowed orchestrations - see the jobs_window attribute to PureScheduler() - a job can be scheduled but not yet running, because it is waiting for a slot in the global window.

Parameters:
  • forever (bool) – if set, means the job is not returning at all and runs forever; in this case Scheduler.orchestrate() will not wait for that job, and will terminate it once all the regular - i.e. not-forever - jobs are done.
  • critical (bool) – if set, this flag indicates that any exception raised during the execution of that job should result in the scheduler aborting its run immediately. The default behaviour is to let the scheduler finish its jobs, at which point the jobs can be inspected for exceptions or results.
  • required – this can be one, or a collection of, jobs that will make the job’s requirements; requirements can be added later on as well.
  • label (str) –

    for convenience mostly, allows to specify the way that particular job should be displayed by the scheduler, either in textual form by Scheduler.list(), or in graphical form by Scheduler.graph(). See also text_label() and graph_label() for how this is used.

    As far as labelling, each subclass of AbstractJob implements a default labelling scheme, so it is not mandatory to set a specific label on each job instance, however it is sometimes useful.

    Labels must not be confused with details, see details()

  • scheduler – this can be an instance of a PureScheduler object, in which the newly created job instance is immediately added. A job instance can also be inserted in a scheduler instance later on.

Note: a Job instance must only be added in one Scheduler instance at most - be aware that the code makes no control on this property, but be aware that odd behaviours can be observed if it is not fulfilled.

coroutine co_run()

Abstract virtual - needs to be implemented

coroutine co_shutdown()

Abstract virtual - needs to be implemented.

details()

An optional method to implement on concrete job classes; if it returns a non None value, these additional details about that job will get printed by asynciojobs.purescheduler.PureScheduler.list() and asynciojobs.purescheduler.PureScheduler.debrief() when called with details=True.

dot_style()

This method computes the DOT attributes which are used to style boxes according to critical / forever / and similar.

Legend is quite simply that:

  • schedulers have sharp angles, while other jobs have rounded corners,
  • critical jobs have a colored and thick border, and
  • forever jobs have a dashed border.
Returns:a dict-like mapping that sets DOT attributes for that job.
Return type:DotStyle
graph_label()

This method is intended to be redefined by daughter classes.

Returns:a string used by the Scheduler methods that produce a graph, such as graph() and export_as_dotfile().

Because of the way graphs are presented, it can have contain “newline” characters, that will render as line breaks in the output graph.

If this method is not defined on a concrete class, then the text_label() method is used instead.

is_critical()
Returns:whether this job is a critical job or not.
Return type:bool
is_done()
Returns:True if the job has completed.
Return type:bool

If this method returns True, it implies that is_scheduled() and is_running() would also return True at that time.

is_idle()
Returns:True if the job has not been scheduled already, which in other words means that at least one of its requirements is not fulfilled.
Return type:bool

Implies not is_scheduled(), and so a fortiori not is_running and not is_done().

is_running()
Returns:once a job starts, it tries to get a slot in the windowing sytem. This method returns True if the job has received the green light from the windowing system. Implies is_scheduled().
Return type:bool
is_scheduled()
Returns:True if the job has been scheduled.
Return type:bool

If True, it means that the job’s requirements are met, and it has proceeded to the windowing system; equivalent to not is_idle().

raised_exception()
Returns:an exception if the job has completed by raising an exception, and None otherwise.
repr_id()
Returns:the job’s id inside the scheduler, or ‘??’ if that was not yet set by the scheduler.
Return type:str
repr_main()
Returns:
standardized body of the object’s repr,
like e.g. <SshJob `my command`>.
Return type:str
repr_requires()
Returns:text part that describes requirements
Return type:str
repr_result()
Returns:
standardized repr’s part that shows
the result or exception of the job.
Return type:str
repr_short()
Returns:
a 4 characters string (in fact 7 with interspaces)
that summarizes the 4 dimensions of the job, that is to say
Return type:str
  • its point in the lifecycle (idle → scheduled → running → done)
  • is it declared as forever
  • is it declared as critical
  • did it trigger an exception
requires(*requirements, remove=False)
Parameters:
  • requirements – an iterable of AbstractJob instances that are added to the requirements.
  • remove (bool) – if set, the requirement are dropped rather than added.
Raises:

KeyError – when trying to remove dependencies that were not present.

For convenience, any nested structure made of job instances can be provided, and if None objects are found, they are silently ignored. For example, with j{1,2,3,4} being jobs or sequences, all the following calls are legitimate:

  • j1.requires(None)
  • j1.requires([None])
  • j1.requires((None,))
  • j1.requires(j2)
  • j1.requires(j2, j3)
  • j1.requires([j2, j3])
  • j1.requires(j2, [j3, j4])
  • j1.requires((j2, j3))
  • j1.requires(([j2], [[[j3]]]))
  • Any of the above with remove=True.

For dropping dependencies instead of adding them, use remove=True

result()
Returns:When this job is completed and has not raised an exception, this method lets you retrieve the job’s result. i.e. the value returned by its co_run() method.
standalone_run()

A convenience helper that just runs this one job on its own.

Mostly useful for debugging the internals of that job, e.g. for checking for gross mistakes and other exceptions.

text_label()

This method is intended to be redefined by daughter classes.

Returns:a one-line string that describes this job.

This representation for the job is used by the Scheduler object through its list() and debrief() methods, i.e. when a scheduler is printed out in textual format.

The overall logic is to always use the instance’s label attribute if set, or to use this method otherwise. If none of this returns anything useful, the textual label used is NOLABEL.

class asynciojobs.job.Job(corun, *args, coshutdown=None, **kwds)

The simplest concrete job class, for building an instance of AbstractJob from of a python coroutine.

Parameters:
  • corun – a coroutine to be evaluated when the job runs
  • coshutdown – an optional coroutine to be evaluated when the scheduler is done running
  • scheduler – passed to AbstractJob
  • required – passed to AbstractJob
  • label – passed to AbstractJob

Example

To create a job that prints a message and waits for a fixed delay:

async def aprint(message, delay):
    print(message)
    await asyncio.sleep(delay)

j = Job(aprint("Welcome - idling for 3 seconds", 3))
coroutine co_run()

Implementation of the method expected by AbstractJob

coroutine co_shutdown()

Implementation of the method expected by AbstractJob, or more exactly by asynciojobs.purescheduler.PureScheduler.list()

text_label()

Implementation of the method expected by AbstractJob


The Sequence class

This module defines the Sequence class, that is designed to ease the building of schedulers

class asynciojobs.sequence.Sequence(*sequences_or_jobs, required=None, scheduler=None)

A Sequence is an object that organizes a set of AbstratJobs in a sequence. Its main purpose is to add a single required relationship per job in the sequence, except the for first one, that instead receives as its required the sequence’s requirements.

If scheduler is passed to the sequence’s constructor, all the jobs passed to the sequence are added in that scheduler.

Sequences are not first-class citizens, in the sense that the scheduler primarily ignores these objects, only the jobs inside the sequence matter.

However a sequence can be used essentially in every place where a job could be, either being inserted in an scheduler, added as a requirement, and it can have requirements too.

Parameters:
  • sequences_or_jobs – each must be a Schedulable object, the order of course is important here
  • required – one, or a collection of, Schedulable objects that will become the requirements for the first job in the sequence
  • scheduler – if provided, the jobs in the sequence will be inserted in that scheduler.
append(*sequences_or_jobs)

Add these jobs or sequences at the end of the present sequence.

Parameters:sequences_or_jobs – each must be a Schedulable object.
requires(*requirements)

Adds requirements to the sequence, so that is to say, to the first job in the sequence.

Parameters:requirements – each must be a Schedulable object.

Notes on ordering

Schedulers and jobs requirements are essentially sets of jobs, and from a semantic point of view, order does not matter.

However for debugging/cosmetic reasons, keeping track of creation order can be convenient.

So using OrderedSet looks like a good idea; but it turns out that on some distros like fedora, installing OrderedSet can be a pain, as it involves recompiling C code, which in turn pulls in a great deal of dependencies.

For this reason, we use OrderedSet only if available, and resort to regular sets otherwise. On macos or ubuntu, fortunately, this can be simply achieved with:

pip3 install orderedset

or alternatively with:

pip3 install asynciojobs[ordered]

Convenience classes

The PrintJob class is a specialization of the AbstractJob class, mostly useful for debugging, tests and tutorials.

class asynciojobs.printjob.PrintJob(*messages, sleep=None, banner=None, scheduler=None, label=None, required=None)

A job that just prints messages, and optionnally sleeps for some time.

Parameters:
  • messages – passed to print as-is
  • sleep – optional, an int or float describing in seconds how long to sleep after the messages get printed
  • banner – optional, a fixed text printed out before the messages like e.g. 40*'='; it won’t make it into details()
  • scheduler – passed to :class:AbstractJob
  • required – passed to :class:AbstractJob
  • label – passed to :class:AbstractJob
coroutine co_run()

Implementation of the method expected by AbstractJob

coroutine co_shutdown()

Implementation of the method expected by AbstractJob; does nothing.

details()

Implementation of the method expected by AbstractJob

A utility to print time and compute durations, mostly for debugging and tests.

class asynciojobs.watch.Watch(message=None, *, show_elapsed=True, show_wall_clock=False)

This class essentially remembers a starting point, so that durations relative to that epoch can be printed for debug instead of a plain timestamp.

Parameters:
  • message (str) – used in the printed message at creation time,
  • show_elapsed (bool) – tells if a message with the elapsed time needs to be printed at creation time (elapsed will be 0),
  • show_wall_clock (bool) – same for the wall clock.

Examples

Here’s a simple use case; note that print_wall_clock() is a static because it is mostly useful, precisely, when you do not have a Watch object at hand:

$ python3
Python 3.6.4 (default, Mar  9 2018, 23:15:12)
<snip>
>>> from asynciojobs import Watch
>>> import time
>>> watch = Watch("hello there"); time.sleep(1); watch.print_elapsed()
000.000  hello there
001.000  >>>
>>>
>>> Watch.print_wall_clock()
20:48:27.782 >>>
elapsed()
Returns:number of seconds elapsed since start, formatted on 7 characters: 3 for seconds, a dot, 3 for milliseconds
Return type:str
print_elapsed(suffix=' ')

Print the elapsed time since start in format SSS.MMM + a suffix.

Parameters:suffix (str) – is appended to the output; to be explicit, by default no newline is added.
static print_wall_clock(suffix=' ')

Print current time in HH:MM:SS.MMM + a suffix.

Parameters:suffix (str) – is appended to the output; to be explicit, by default no newline is added.
reset()

Use current wall clock as starting point.

seconds()
Returns:time elapsed since start, in seconds.
Return type:float