# -*- coding: utf-8 -*-
"""
This module defines :class:`AbstractJob`, that is the base class
for all the jobs in a Scheduler, as well as a basic concrete subclass
:class:`Job` for creating a job from a coroutine.
It also defines a couple of simple job classes.
"""
import sys
import asyncio
from .bestset import BestSet
from .dotstyle import DotStyle
# pylint settings
# W0212: we have a lot of accesses to protected members of other classes
# R0914 Too many local variables
# pylint: disable=W0212
# what we need is a way to attach our own Job instance to the corresp.
# Task instance (and back) right after Task creation, so that
# (*) once asyncio.wait is done, we can easily find out
# wich Jobs are done or pending
# (*) from one Job, easily know what its status is by
# looking into its Task obj - if already scheduled
# Scheduler == graph
# Job == node
[docs]
class AbstractJob: # pylint: disable=R0902
"""
AbstractJob is a virtual class:
* it offers some very basic graph-related features
to model requirements *a la* Makefile;
* its subclasses are expected to implement a `co_run()`
and a `co_shutdown()` methods that specify
the actual behaviour of the job, as coroutines.
* AbstractJob is mostly a companion class to the
:class:`~asynciojobs.purescheduler.PureScheduler` class,
that triggers these co_* methods.
**Life Cycle**: AbstractJob is also aware of a common life cycle
for all jobs, which can be summarized as follows:
**idle** → **scheduled** → **running** → **done**
In un-windowed schedulers, there is no distinction between
scheduled and running. In other words, in this case a job goes directly
from **idle** to **running**.a
On the other hand, in windowed orchestrations - see the ``jobs_window``
attribute to
:meth:`~asynciojobs.purescheduler.PureScheduler` -
a job can be scheduled but not yet running, because it is waiting
for a slot in the global window.
Args:
forever (bool): if set, means the job
is not returning at all and runs forever;
in this case ``Scheduler.orchestrate()``
will not wait for that job, and will terminate it
once all the regular - i.e. not-forever - jobs are done.
critical (bool): if set,
this flag indicates that any exception raised during the
execution of that job should result in the scheduler
aborting its run immediately. The default behaviour
is to let the scheduler finish its jobs, at which point
the jobs can be inspected for exceptions or results.
required: this can be one, or a collection of, jobs that will
make the job's requirements;
requirements can be added later on as well.
label (str): for convenience mostly, allows to specify
the way that particular job should be displayed by the scheduler,
either in textual form by ``Scheduler.list()``, or in graphical form
by ``Scheduler.graph()``. See also :meth:`text_label()`
and :meth:`graph_label()` for how this is used.
As far as labelling, each subclass of :class:`AbstractJob`
implements a default labelling scheme, so it is not mandatory
to set a specific label on each job instance, however it is
sometimes useful.
Labels must not be confused with details, see :meth:`details()`
scheduler: this can be an instance of a
:class:`~asynciojobs.purescheduler.PureScheduler` object, in which
the newly created job instance is immediately added.
A job instance can also be inserted in a scheduler instance later on.
**Note**: a Job instance must only be added in **one Scheduler instance**
at most - be aware that the code makes no control on this property,
but be aware that odd behaviours can be observed if it is not fulfilled.
"""
def __init__(self, *, # pylint: disable=R0913
forever=False, critical=True, label=None,
required=None, scheduler=None):
self.forever = forever
self.critical = critical
# access to labelling is done through methods
self.label = label
# for convenience, one can mention
# only one, or a collection of, AbstractJobs
self.required = BestSet()
self.requires(required)
# convenience again
if scheduler is not None:
scheduler.add(self)
# once submitted in the asyncio loop/scheduler,
# `co_run()` gets embedded in a Task object,
# that is our handle when talking to asyncio.wait
self._task = None
# this is updated by the Window class when the job makes it through
self._running = False
# ==== fields for our friend Scheduler all start with _s_
# this is for graph browsing algos
self._s_mark = None
# the reverse of required
self._s_successors = BestSet()
# this attribute is reserved for use by the scheduler
# that will for example store there an ordering information,
# which in turn can be used for printing relationships
# by Scheduler.list() and similar
self._sched_id = None
def _set_sched_id(self, start, id_format):
"""
Works as a complicit to Scheduler._set_sched_ids.
It sets local the ``_sched_id`` attribute and returns the index
for the next job.
This is defined as a method so that (nestable) Scheduler
can redefine it.
Returns:
int: next index, in this case ``start + 1``
"""
self._sched_id = str(id_format.format(start))
return start + 1
def _job_count(self): # pylint: disable=r0201
"""
Complicit to Scheduler._total_length()
"""
return 1
# don't use parameter recursive
def _list(self, details, depth, _):
"""
Complicit to PureScheduler.list()
"""
indent = ('>'*depth + ' ') if depth else ''
print("{} {} {}{} {} {}"
.format(self.repr_id(),
self.repr_short(),
indent,
self.repr_main(),
self.repr_result(),
self.repr_requires()))
if details and hasattr(self, 'details'):
details = self.details() # pylint: disable=e1111
if details is not None:
print(details)
# don't use parameter recursive
def _list_safe(self, _):
"""
Complicit to PureScheduler.list_safe()
"""
print("{} {} {} {}"
.format(self.repr_short(),
self.repr_id(),
self.repr_main(),
self.repr_requires()))
def _iterate_jobs(self, scan_schedulers): # pylint: disable=w0613
yield self
def _get_text_label(self):
# In terms of labelling, things have become a little tricky over
# time. When listing an instance of Scheduler, there are 2 ways
# we need to show a job
#
# * first there is a plain label, that may be set at creation time
#
# * second, when showing references (like the jobs that a given job
# requires), we show ids like '01' and similar.
# Except that, the job itself has no idea about that at first,
# it's the Scheduler instance that decides on that.
#
# use instance-specific label if set
attempt = self.label
if attempt is not None:
return attempt
# otherwise, try self.text_label() and use that
attempt = self.text_label() # pylint: disable=e1111
if attempt is not None:
return attempt
# otherwise
return "NOLABEL"
def _get_graph_label(self):
# a similar logic for a graphical label
# we don't need to bother about _sched_id here though
# also we try graph_label() first, and resort to text_label()
# if it's not redefined on the object
attempt = self.graph_label() # pylint: disable=e1111
if attempt is not None:
return attempt
return "{}: {}".format(self.repr_id(), self._get_text_label())
[docs]
def text_label(self):
"""
This method is intended to be redefined by daughter classes.
Returns:
a one-line string that describes this job.
This representation for the job is used by the Scheduler object
through its :meth:`~asynciojobs.purescheduler.PureScheduler.list()` and
:meth:`~asynciojobs.purescheduler.PureScheduler.debrief()` methods,
i.e. when a scheduler is printed out in textual format.
The overall logic is to always use the instance's ``label`` attribute
if set, or to use this method otherwise. If none of this returns
anything useful, the textual label used is ``NOLABEL``.
"""
pass
[docs]
def graph_label(self):
"""
This method is intended to be redefined by daughter classes.
Returns:
a string used by the Scheduler methods
that produce a graph, such as
:meth:`~asynciojobs.purescheduler.PureScheduler.graph` and
:meth:`~asynciojobs.purescheduler.PureScheduler.export_as_dotfile`.
Because of the way graphs are presented, it can have contain "newline"
characters, that will render as line breaks in the output graph.
If this method is not defined on a concrete class,
then the :meth:`text_label()` method is used instead.
"""
pass
[docs]
def dot_style(self): # pylint: disable=c0111
"""
This method computes the DOT attributes
which are used to style boxes according to
critical / forever / and similar.
Legend is quite simply that:
* schedulers have sharp angles, while other jobs have
rounded corners,
* critical jobs have a colored and thick border, and
* forever jobs have a dashed border.
Returns:
DotStyle: a dict-like mapping that sets DOT
attributes for that job.
"""
# see also https://graphviz.gitlab.io/_pages/doc/info/attrs.html
style = DotStyle()
# style; DotStyle known how to deal with lists
style['style'] = []
# label
label = self._get_graph_label()
if label != 'NOLABEL':
style['label'] = label
# common attributes : rounded box
style['shape'] = 'box'
# schedulers keep sharp corners, jobs have rounded corners
from .purescheduler import PureScheduler
if not isinstance(self, PureScheduler):
style['style'].append('rounded')
# forever items have a dashed line
if self.forever:
style['style'].append('dashed')
# critical elements have a thicker red border
if self.is_critical():
style['color'] = 'red'
style['penwidth'] = 2
else:
style['penwidth'] = 0.5
return style
##########
_has_support_for_unicode = None # type: bool
@classmethod
def _detect_support_for_unicode(cls):
if cls._has_support_for_unicode is None:
try:
# somehow on the test VB ubuntu box for apssh
# sys.stdout.encoding is None
cls._c_saltire.encode(sys.stdout.encoding or 'UTF-8')
cls._has_support_for_unicode = True
except UnicodeEncodeError:
cls._has_support_for_unicode = False
return cls._has_support_for_unicode
# unicode version
# _c_frowning_face = "\u2639" # ☹
# _c_smiling_face = "\u263b" # ☻
_c_saltire = "\u2613" # ☓
_c_circle_arrow = "\u21ba" # ↺
_c_black_flag = "\u2691" # ⚑
_c_white_flag = "\u2690" # ⚐
_c_warning = "\u26a0" # ⚠
_c_black_star = "\u2605" # ★
_c_sun = "\u2609" # ☉
_c_infinity = "\u221e" # ∞
def _short_unicode(self):
"""
A small (7 chars) badge that summarizes the job's internal attributes
uses non-ASCII characters
"""
# where is it in the lifecycle
c_running = self._c_saltire if self.is_done() else \
self._c_circle_arrow if self.is_running() else \
self._c_black_flag if self.is_scheduled() else \
self._c_white_flag
# is it critical or not ?
c_crit = self._c_warning if self.is_critical() else " "
# has it raised an exception or not ?
c_boom = self._c_black_star if self.raised_exception() \
else self._c_sun if self.is_running() \
else " "
# is it going forever or not
c_forever = self._c_infinity if self.forever else " "
# add extra white space as unicode chars in terminal tend to be wider
# than others
return "{} {} {} {}".format(c_crit, c_boom, c_running, c_forever)
def _short_ascii(self):
"""
A small (7 chars) badge that summarizes the job's internal attributes
uses ASCII-only characters
"""
# where is it in the lifecycle
c_running = "x" if self.is_done() else \
"o" if self.is_running() else \
"." if self.is_scheduled() else \
">"
# is it critical or not ?
c_crit = "!" if self.is_critical() else " "
# has it raised an exception or not ?
c_boom = ":(" if self.raised_exception() \
else ":)" if self.is_running() \
else " "
# is it going forever or not
c_forever = "8" if self.forever else " "
# add extra white space as unicode chars in terminal tend to be wider
# than others
return "{} {} {} {}".format(c_crit, c_boom, c_running, c_forever)
[docs]
def repr_id(self):
"""
Returns:
str: the job's id inside the scheduler, or '??' if that was
not yet set by the scheduler.
"""
return self._sched_id or '??'
[docs]
def repr_short(self):
"""
Returns:
str: a 4 characters string (in fact 7 with interspaces)
that summarizes the 4 dimensions of the job, that is to say
* its point in the lifecycle (idle → scheduled → running → done)
* is it declared as forever
* is it declared as critical
* did it trigger an exception
"""
if self._detect_support_for_unicode():
return self._short_unicode()
return self._short_ascii()
def _req_csv(self):
if not self.required:
return ""
return ("{"
+ ", ".join(req.repr_id()
for req in self.required)
+ "}")
[docs]
def repr_main(self):
"""
Returns:
str: standardized body of the object's repr,
like e.g. ``<SshJob `my command`>``.
"""
return ("<{} `{}`>"
.format(type(self).__name__,
self._get_text_label()))
[docs]
def repr_result(self):
"""
Returns:
str: standardized repr's part that shows
the result or exception of the job.
"""
exception = self.raised_exception()
if exception:
critical_msg = "CRIT. EXC." if self.is_critical() \
else "exception"
return ("!! {} => {}:{}!!"
.format(critical_msg,
type(exception).__name__, exception))
if self.is_done():
return "[[ -> {}]]".format(self.result())
return "[not done]"
[docs]
def repr_requires(self):
"""
Returns:
str: text part that describes requirements
"""
if self.required:
return "requires={}".format(self._req_csv())
return ""
def __repr__(self):
return (
"{} {} {} {}"
.format(self.repr_short(),
self.repr_main(),
self.repr_result(),
self.repr_requires()))
def _add_one_requirement(self, job):
# refuse to add oneself as a requirement
# typically useful when adding a pre-requirement
# in a scenario, like e.g. check_lease
if job is not self:
self.required.add(job)
[docs]
def requires(self, *requirements, remove=False) -> 'AbstractJob':
"""
Arguments:
requirements: an iterable of `AbstractJob`
instances that are added to the requirements.
remove(bool): if set, the requirement are dropped rather than added.
Raises:
KeyError: when trying to remove dependencies that were not present.
Returns:
self: for chaining
For convenience, any nested structure made of job instances
can be provided, and if None objects are found, they are silently
ignored. For example, with `j{1,2,3,4}` being jobs or sequences,
all the following calls are legitimate:
* ``j1.requires(None)``
* ``j1.requires([None])``
* ``j1.requires((None,))``
* ``j1.requires(j2)``
* ``j1.requires(j2, j3)``
* ``j1.requires([j2, j3])``
* ``j1.requires(j2, [j3, j4])``
* ``j1.requires((j2, j3))``
* ``j1.requires(([j2], [[[j3]]]))``
* Any of the above with ``remove=True``.
For dropping dependencies instead of adding them, use ``remove=True``
"""
from .sequence import Sequence
for requirement in requirements:
if requirement is None:
continue
if isinstance(requirement, AbstractJob):
if not remove:
self._add_one_requirement(requirement)
else:
self.required.remove(requirement)
elif isinstance(requirement, Sequence):
if requirement.jobs:
self._add_one_requirement(requirement.jobs[-1])
elif isinstance(requirement, (tuple, list, set)):
for req in requirement:
self.requires(req, remove=remove)
# not quite sure about what do to here in fact
# assuming it's some other sort of iterable, e.g. a generator
else:
print("WARNING: fishy requirement in AbstractJob.requires")
self.requires(list(requirement), remove=remove)
return self
[docs]
def is_idle(self):
"""
Returns:
bool: ``True`` if the job has not been scheduled already, which
in other words means that at least one of its requirements
is not fulfilled.
Implies `not is_scheduled()`, and so *a fortiori*
`not is_running` and `not is_done()`.
"""
return self._task is None
[docs]
def is_scheduled(self):
"""
Returns:
bool: ``True`` if the job has been scheduled.
If True, it means that the job's requirements are met, and it has
proceeded to the windowing system; equivalent to `not is_idle()`.
"""
return self._task is not None
[docs]
def is_running(self):
"""
Returns:
bool: once a job starts, it tries to get a slot
in the windowing sytem. This method returns ``True`` if the job
has received the green light from the windowing system.
Implies `is_scheduled()`.
"""
return self._running
[docs]
def is_done(self):
"""
Returns:
bool: ``True`` if the job has completed.
If this method returns ``True``, it implies that
`is_scheduled()` and `is_running()`
would also return ``True`` at that time.
"""
return self._task is not None \
and self._task._state == asyncio.futures._FINISHED
[docs]
def raised_exception(self):
"""
Returns:
an exception if the job has completed by raising an exception,
and None otherwise.
"""
return self._task is not None and self._task._exception
[docs]
def is_critical(self):
"""
Returns:
bool: whether this job is a critical job or not.
"""
return self.critical
[docs]
def result(self):
"""
Returns:
When this job is completed and has not raised an exception, this
method lets you retrieve the job's result. i.e. the value returned
by its `co_run()` method.
"""
if not self.is_done():
raise ValueError("job not finished")
return self._task._result
[docs]
async def co_run(self):
"""
Abstract virtual - needs to be implemented
"""
print("AbstractJob.co_run() needs to be implemented on class {}"
.format(self.__class__.__name__))
[docs]
async def co_shutdown(self):
"""
Abstract virtual - needs to be implemented.
"""
print("AbstractJob.co_shutdown() needs to be implemented on class {}"
.format(self.__class__.__name__))
[docs]
def standalone_run(self):
"""
A convenience helper that just runs this one job on its own.
Mostly useful for debugging the internals of that job,
e.g. for checking for gross mistakes and other exceptions.
"""
loop = asyncio.get_event_loop()
return loop.run_until_complete(self.co_run())
[docs]
def details(self):
"""
An optional method to implement on concrete job classes; if it
returns a non None value, these additional details about that job
will get printed by
:meth:`asynciojobs.purescheduler.PureScheduler.list()` and
:meth:`asynciojobs.purescheduler.PureScheduler.debrief()`
when called with `details=True`.
"""
pass
[docs]
class Job(AbstractJob):
"""
The simplest concrete job class, for building an instance of AbstractJob
from of a python coroutine.
Parameters:
corun: a coroutine to be evaluated when the job runs
coshutdown: an optional coroutine to be evaluated when the scheduler
is done running
scheduler: passed to :class:`AbstractJob`
required: passed to :class:`AbstractJob`
label: passed to :class:`AbstractJob`
Example:
To create a job that prints a message and waits for a fixed delay::
async def aprint(message, delay):
print(message)
await asyncio.sleep(delay)
j = Job(aprint("Welcome - idling for 3 seconds", 3))
"""
def __init__(self, corun, *args, coshutdown=None, **kwds):
self.corun = corun
self.coshutdown = coshutdown
super().__init__(*args, **kwds)
[docs]
def text_label(self):
"""
Implementation of the method expected by :class:`AbstractJob`
"""
try:
return "Job[{name} (...)]".format(name=self.corun.__name__)
except Exception: # pylint:disable=w0703
return "Job instance"
[docs]
async def co_run(self):
"""
Implementation of the method expected by :class:`AbstractJob`
"""
result = await self.corun
return result
[docs]
async def co_shutdown(self):
"""
Implementation of the method expected by :class:`AbstractJob`,
or more exactly by :meth:`asynciojobs.purescheduler.PureScheduler.list`
"""
if self.coshutdown:
result = await self.coshutdown
return result