# 6.4.2. Dispatcher¶

class Dispatcher(dmap=None, name='', default_values=None, raises=False, description='', caller=None, stopper=None)[source]

It provides a data structure to process a complex system of functions.

The scope of this data structure is to compute the shortest workflow between input and output data nodes.

Variables: stopper – A semaphore (threading.Event) to abort the dispatching.

Tip

Rember to set stopper to False before dispatching ;-)

A workflow is a sequence of function calls.

***************************************************************************

Example:

As an example, here is a system of equations:

$$b - a = c$$

$$log(c) = d_{from-log}$$

$$d = (d_{from-log} + d_{initial-guess}) / 2$$

that will be solved assuming that $$a = 0$$, $$b = 1$$, and $$d_{initial-guess} = 4$$.

Steps

Create an empty dispatcher:

>>> dsp = Dispatcher(name='Dispatcher')


Add data nodes to the dispatcher map:

>>> dsp.add_data(data_id='a')
'a'
'c'


Add a data node with a default value to the dispatcher map:

>>> dsp.add_data(data_id='b', default_value=1)
'b'


>>> def diff_function(a, b):
...     return b - a
...
...                  inputs=['a', 'b'], outputs=['c'])
'diff_function'


Add a function node with domain:

>>> from math import log
...
>>> def log_domain(x):
...     return x > 0
...
...                  input_domain=log_domain)
'log'


Add a data node with function estimation and callback function.

• function estimation: estimate one unique output from multiple estimations.
• callback function: is invoked after computing the output.
>>> def average_fun(kwargs):
...     '''
...     Returns the average of node estimations.
...
...     :param kwargs:
...         Node estimations.
...     :type kwargs: dict
...
...     :return:
...         The average of node estimations.
...     :rtype: float
...     '''
...
...     x = kwargs.values()
...     return sum(x) / len(x)
...
>>> def callback_fun(x):
...     print('(log(1) + 4) / 2 = %.1f' % x)
...
...              function=average_fun, callback=callback_fun)
'd'


Dispatch the function calls to achieve the desired output data node d:

>>> outputs = dsp.dispatch(inputs={'a': 0}, outputs=['d'])
(log(1) + 4) / 2 = 2.0
>>> sorted(outputs.items())
[('a', 0), ('b', 1), ('c', 1), ('d', 2.0)]


Methods

 __init__ Initializes the dispatcher. add_data Add a single data node to the dispatcher. add_dispatcher Add a single sub-dispatcher node to dispatcher. add_from_lists Add multiple function and data nodes to dispatcher. add_function Add a single function node to dispatcher. celery Creates a dispatcher Celery app. copy Returns a copy of the Dispatcher. copy_structure dispatch Evaluates the minimum workflow and data outputs of the dispatcher model from given inputs. get_full_node_id Returns the full node id. get_node Returns a sub node of a dispatcher. get_sub_dsp Returns the sub-dispatcher induced by given node and edge bunches. get_sub_dsp_from_workflow Returns the sub-dispatcher induced by the workflow from sources. plot Plots the Dispatcher with a graph in the DOT language with Graphviz. remove_cycles Returns a new dispatcher removing unresolved cycles. set_data_remote_link Set a remote link of a data node in the dispatcher. set_default_value Set the default value of a data node in the dispatcher. shrink_dsp Returns a reduced dispatcher. web Creates a dispatcher Flask app.
__init__(dmap=None, name='', default_values=None, raises=False, description='', caller=None, stopper=None)[source]

Initializes the dispatcher.

Parameters: dmap (networkx.DiGraph, optional) – A directed graph that stores data & functions parameters. name (str, optional) – The dispatcher’s name. default_values (dict[str, dict], optional) – Data node default values. These will be used as input if it is not specified as inputs in the ArciDispatch algorithm. raises (bool, optional) – If True the dispatcher interrupt the dispatch when an error occur, otherwise it logs a warning. description (str, optional) – The dispatcher’s description. caller (str, optional) – Who calls my caller? stopper (threading.Event, optional) – A semaphore to abort the dispatching.

Attributes

 data_nodes Returns all data nodes of the dispatcher. function_nodes Returns all function nodes of the dispatcher. sub_dsp_nodes Returns all sub-dispatcher nodes of the dispatcher.
dmap = None

The directed graph that stores data & functions parameters.

name = None

The dispatcher’s name.

nodes = None

The function and data nodes of the dispatcher.

default_values = None

Data node default values. These will be used as input if it is not specified as inputs in the ArciDispatch algorithm.

weight = None

Weight tag.

raises = None

If True the dispatcher interrupt the dispatch when an error occur.

stopper = <threading.Event object>

Stopper to abort the dispatcher execution.

solution = None

Last dispatch solution.

counter = None

Counter to set the node index.

add_data(data_id=None, default_value=empty, initial_dist=0.0, wait_inputs=False, wildcard=None, function=None, callback=None, remote_links=None, description=None, filters=None, **kwargs)[source]

Add a single data node to the dispatcher.

Parameters: data_id (str, optional) – Data node id. If None will be assigned automatically (‘unknown<%d>’) not in dmap. default_value (T, optional) – Data node default value. This will be used as input if it is not specified as inputs in the ArciDispatch algorithm. initial_dist (float, int, optional) – Initial distance in the ArciDispatch algorithm when the data node default value is used. wait_inputs (bool, optional) – If True ArciDispatch algorithm stops on the node until it gets all input estimations. wildcard (bool, optional) – If True, when the data node is used as input and target in the ArciDispatch algorithm, the input value will be used as input for the connected functions, but not as output. function (function, optional) – Data node estimation function. This can be any function that takes only one dictionary (key=function node id, value=estimation of data node) as input and return one value that is the estimation of the data node. callback (function, optional) – Callback function to be called after node estimation. This can be any function that takes only one argument that is the data node estimation output. It does not return anything. remote_links (list[[str, Dispatcher]], optional) – List of parent or child dispatcher nodes e.g., [[dsp_id, dsp], ...]. description (str, optional) – Data node’s description. filters (list[function], optional) – A list of functions that are invoked after the invocation of the main function. kwargs (keyword arguments, optional) – Set additional node attributes using key=value. Data node id. str

***********************************************************************

Example:

Add a data to be estimated or a possible input data node:

>>> dsp.add_data(data_id='a')
'a'


Add a data with a default value (i.e., input data node):

>>> dsp.add_data(data_id='b', default_value=1)
'b'


Create a data node with function estimation and a default value.

• function estimation: estimate one unique output from multiple estimations.
• default value: is a default estimation.
>>> def min_fun(kwargs):
...     '''
...     Returns the minimum value of node estimations.
...
...     :param kwargs:
...         Node estimations.
...     :type kwargs: dict
...
...     :return:
...         The minimum value of node estimations.
...     :rtype: float
...     '''
...
...     return min(kwargs.values())
...
...              function=min_fun)
'c'


Create a data with an unknown id and return the generated id:

>>> dsp.add_data()
'unknown'

add_function(function_id=None, function=None, inputs=None, outputs=None, input_domain=None, weight=None, inp_weight=None, out_weight=None, description=None, filters=None, **kwargs)[source]

Add a single function node to dispatcher.

Parameters: function_id (str, optional) – Function node id. If None will be assigned as :. function (function, optional) – Data node estimation function. inputs (list, optional) – Ordered arguments (i.e., data node ids) needed by the function. outputs (list, optional) – Ordered results (i.e., data node ids) returned by the function. input_domain (function, optional) – A function that checks if input values satisfy the function domain. This can be any function that takes the same inputs of the function and returns True if input values satisfy the domain, otherwise False. In this case the dispatch algorithm doesn’t pass on the node. weight (float, int, optional) – Node weight. It is a weight coefficient that is used by the dispatch algorithm to estimate the minimum workflow. inp_weight (dict[str, float | int], optional) – Edge weights from data nodes to the function node. It is a dictionary (key=data node id) with the weight coefficients used by the dispatch algorithm to estimate the minimum workflow. out_weight (dict[str, float | int], optional) – Edge weights from the function node to data nodes. It is a dictionary (key=data node id) with the weight coefficients used by the dispatch algorithm to estimate the minimum workflow. description (str, optional) – Function node’s description. filters (list[function], optional) – A list of functions that are invoked after the invocation of the main function. kwargs (keyword arguments, optional) – Set additional node attributes using key=value. Function node id. str

***********************************************************************

Example:

>>> def my_function(a, b):
...     c = a + b
...     d = a - b
...     return c, d
...
...                  outputs=['c', 'd'])
'...dispatcher:my_function'


Add a function node with domain:

>>> from math import log
>>> def my_log(a, b):
...     return log(b - a)
...
>>> def my_domain(a, b):
...     return a < b
...
...                  outputs=['e'], input_domain=my_domain)
'...dispatcher:my_log'

add_dispatcher(dsp, inputs, outputs, dsp_id=None, input_domain=None, weight=None, inp_weight=None, description=None, include_defaults=False, **kwargs)[source]

Add a single sub-dispatcher node to dispatcher.

Parameters: dsp (Dispatcher | dict[str, list]) – Child dispatcher that is added as sub-dispatcher node to the parent dispatcher. inputs (dict[str, str | list[str]]) – Inputs mapping. Data node ids from parent dispatcher to child sub-dispatcher. outputs (dict[str, str | list[str]]) – Outputs mapping. Data node ids from child sub-dispatcher to parent dispatcher. dsp_id (str, optional) – Sub-dispatcher node id. If None will be assigned as :. input_domain ((dict) -> bool, optional) – A function that checks if input values satisfy the function domain. This can be any function that takes the a dictionary with the inputs of the sub-dispatcher node and returns True if input values satisfy the domain, otherwise False. Note This function is invoked every time that a data node reach the sub-dispatcher node. weight (float, int, optional) – Node weight. It is a weight coefficient that is used by the dispatch algorithm to estimate the minimum workflow. inp_weight (dict[str, int | float], optional) – Edge weights from data nodes to the sub-dispatcher node. It is a dictionary (key=data node id) with the weight coefficients used by the dispatch algorithm to estimate the minimum workflow. description (str, optional) – Sub-dispatcher node’s description. include_defaults (bool, optional) – If True the default values of the sub-dispatcher are added to the current dispatcher. kwargs (keyword arguments, optional) – Set additional node attributes using key=value. Sub-dispatcher node id. str

***********************************************************************

Example:

Create a sub-dispatcher:

>>> sub_dsp = Dispatcher()
>>> sub_dsp.add_function('max', max, ['a', 'b'], ['c'])
'max'


Add the sub-dispatcher to the parent dispatcher:

>>> dsp.add_dispatcher(dsp_id='Sub-Dispatcher', dsp=sub_dsp,
...                    inputs={'A': 'a', 'B': 'b'},
...                    outputs={'c': 'C'})
'Sub-Dispatcher'


Add a sub-dispatcher node with domain:

>>> def my_domain(kwargs):
...     return kwargs['C'] > 3
...
...                    dsp=sub_dsp, inputs={'C': 'a', 'D': 'b'},
...                    outputs={'c': 'E'}, input_domain=my_domain)
'Sub-Dispatcher with domain'

add_from_lists(data_list=None, fun_list=None, dsp_list=None)[source]

Add multiple function and data nodes to dispatcher.

Parameters: data_list (list[dict], optional) – It is a list of data node kwargs to be loaded. fun_list (list[dict], optional) – It is a list of function node kwargs to be loaded. dsp_list (list[dict], optional) – It is a list of sub-dispatcher node kwargs to be loaded. Data node ids. Function node ids. Sub-dispatcher node ids. (list[str], list[str], list[str])

***********************************************************************

Example:

Define a data list:

>>> data_list = [
...     {'data_id': 'a'},
...     {'data_id': 'b'},
...     {'data_id': 'c'},
... ]


Define a functions list:

>>> def f(a, b):
...     return a + b
...
>>> fun_list = [
...     {'function': f, 'inputs': ['a', 'b'], 'outputs': ['c']}
... ]


Define a functions list:

>>> sub_dsp = Dispatcher(name='Sub-dispatcher')
...                      outputs=['g'])
'...:f'
>>>
>>> dsp_list = [
...     {'dsp_id': 'Sub', 'dsp': sub_dsp,
...      'inputs': {'a': 'e', 'b': 'f'}, 'outputs': {'g': 'c'}},
... ]


Add function and data nodes to dispatcher:

>>> dsp.add_from_lists(data_list, fun_list, dsp_list)
(['a', 'b', 'c'], ['...dispatcher:f'], ['Sub'])

set_default_value(data_id, value=empty, initial_dist=0.0)[source]

Set the default value of a data node in the dispatcher.

Parameters: data_id (str) – Data node id. value (T, optional) – Data node default value. Note If EMPTY the previous default value is removed. initial_dist (float, int, optional) – Initial distance in the ArciDispatch algorithm when the data node default value is used.

***********************************************************************

Example:

A dispatcher with a data node named a:

>>> dsp = Dispatcher(name='Dispatcher')
...
'a'


Add a default value to a node:

>>> dsp.set_default_value('a', value='value of the data')
>>> list(sorted(dsp.default_values['a'].items()))
[('initial_dist', 0.0), ('value', 'value of the data')]


Remove the default value of a node:

>>> dsp.set_default_value('a', value=EMPTY)
>>> dsp.default_values
{}


Set a remote link of a data node in the dispatcher.

Parameters: data_id (str) – Data node id. remote_link ([str, Dispatcher], optional) – Parent or child dispatcher and its node id (id, dsp). is_parent (bool) – If True the link is inflow (parent), otherwise is outflow (child).
get_sub_dsp(nodes_bunch, edges_bunch=None)[source]

Returns the sub-dispatcher induced by given node and edge bunches.

The induced sub-dispatcher contains the available nodes in nodes_bunch and edges between those nodes, excluding those that are in edges_bunch.

The available nodes are non isolated nodes and function nodes that have all inputs and at least one output.

Parameters: nodes_bunch (list[str], iterable) – A container of node ids which will be iterated through once. edges_bunch (list[(str, str)], iterable, optional) – A container of edge ids that will be removed. A dispatcher. Dispatcher

Note

The sub-dispatcher edge or node attributes just point to the original dispatcher. So changes to the node or edge structure will not be reflected in the original dispatcher map while changes to the attributes will.

***********************************************************************

Example:

A dispatcher with a two functions fun1 and fun2:

Get the sub-dispatcher induced by given nodes bunch:

>>> sub_dsp = dsp.get_sub_dsp(['a', 'c', 'd', 'e', 'fun2'])

get_sub_dsp_from_workflow(sources, graph=None, reverse=False, add_missing=False, check_inputs=True)[source]

Returns the sub-dispatcher induced by the workflow from sources.

The induced sub-dispatcher of the dsp contains the reachable nodes and edges evaluated with breadth-first-search on the workflow graph from source nodes.

Parameters: sources (list[str], iterable) – Source nodes for the breadth-first-search. A container of nodes which will be iterated through once. graph (networkx.DiGraph, optional) – A directed graph where evaluate the breadth-first-search. reverse (bool, optional) – If True the workflow graph is assumed as reversed. add_missing (bool, optional) – If True, missing function’ inputs are added to the sub-dispatcher. check_inputs (bool, optional) – If True the missing function’ inputs are not checked. A sub-dispatcher. Dispatcher

Note

The sub-dispatcher edge or node attributes just point to the original dispatcher. So changes to the node or edge structure will not be reflected in the original dispatcher map while changes to the attributes will.

***********************************************************************

Example:

A dispatcher with a function fun and a node a with a default value:

Dispatch with no calls in order to have a workflow:

>>> o = dsp.dispatch(inputs=['a', 'b'], no_call=True)


Get sub-dispatcher from workflow inputs a and b:

>>> sub_dsp = dsp.get_sub_dsp_from_workflow(['a', 'b'])


Get sub-dispatcher from a workflow output c:

>>> sub_dsp = dsp.get_sub_dsp_from_workflow(['c'], reverse=True)

get_node(*node_ids, node_attr='auto')[source]

Returns a sub node of a dispatcher.

Parameters: node_ids (str) – A sequence of node ids or a single node id. The id order identifies a dispatcher sub-level. node_attr (str, None, optional) – Output node attr. If the searched node does not have this attribute, all its attributes are returned. When ‘auto’, returns the “default” attributes of the searched node, which are: for data node: its output, and if not exists, all its attributes. for function and sub-dispatcher nodes: the ‘function’ attribute. When ‘description’, returns the “description” of the searched node, searching also in function or sub-dispatcher input/output description. When ‘output’, returns the data node output. When ‘default_value’, returns the data node default value. When ‘value_type’, returns the data node value’s type. When None, returns the node attributes. Node attributes and its real path. (T, (str, ...))

Example:

Get the sub node output:

>>> dsp.get_node('Sub-dispatcher', 'c')
(4, ('Sub-dispatcher', 'c'))
>>> dsp.get_node('Sub-dispatcher', 'c', node_attr='type')
('data', ('Sub-dispatcher', 'c'))

>>> sub_dsp, sub_dsp_id = dsp.get_node('Sub-dispatcher')

get_full_node_id(*node_ids)[source]

Returns the full node id.

Parameters: node_ids (str) – A sequence of node ids or a single node id. The id order identifies a dispatcher sub-level. If it is empty it will return the full id of the dispatcher. Full node id and related . tuple[str], tuple[Dispatcher]
data_nodes

Returns all data nodes of the dispatcher.

Returns: All data nodes of the dispatcher. dict[str, dict]
function_nodes

Returns all function nodes of the dispatcher.

Returns: All data function of the dispatcher. dict[str, dict]
sub_dsp_nodes

Returns all sub-dispatcher nodes of the dispatcher.

Returns: All sub-dispatcher nodes of the dispatcher. dict[str, dict]
copy()[source]

Returns a copy of the Dispatcher.

Returns: A copy of the Dispatcher. Dispatcher

Example:

>>> dsp = Dispatcher()
>>> dsp is dsp.copy()
False

plot(workflow=False, view=True, nested=True, edge_data=(), node_data=(), node_function=(), draw_outputs=0, node_styles=None, depth=-1, function_module=False, name=None, comment=None, directory=None, filename=None, format='svg', engine=None, encoding=None, graph_attr=None, node_attr=None, edge_attr=None, body=None)[source]

Plots the Dispatcher with a graph in the DOT language with Graphviz.

Parameters: workflow (bool, optional) – If True the latest solution will be plotted, otherwise the dmap. view (bool, optional) – Open the rendered directed graph in the DOT language with the sys default opener. nested (bool, optional) – If False the sub-dispatcher nodes are plotted on the same graph, otherwise they can be viewed clicking on the node that has an URL link. edge_data (tuple[str], optional) – Edge attributes to view. node_data (tuple[str], optional) – Data node attributes to view. node_function (tuple[str], optional) – Function node attributes to view. draw_outputs (int, optional) – It modifies the defaults data node and edge attributes to view. If draw_outputs is: 1: node attribute ‘output’ is drawn. 2: edge attribute ‘value’ is drawn. 3: node ‘output’ and edge ‘value’ attributes are drawn. otherwise: node ‘output’ and edge ‘value’ attributes are not drawn. node_styles (dict[str|Token, dict[str, str]]) – Default node styles according to graphviz node attributes. depth (int, optional) – Depth of sub-dispatch plots. If negative all levels are plotted. function_module (bool, optional) – If True the function labels are plotted with the function module, otherwise only the function name will be visible. name (str) – Graph name used in the source code. comment (str) – Comment added to the first line of the source. directory (str, optional) – (Sub)directory for source saving and rendering. filename (str, optional) – File name for saving the source. format (str, optional) – Rendering output format (‘pdf’, ‘png’, ...). engine (str, optional) – Layout command used (‘dot’, ‘neato’, ...). encoding (str, optional) – Encoding for saving the source. graph_attr (dict, optional) – Dict of (attribute, value) pairs for the graph. node_attr (dict, optional) – Dict of (attribute, value) pairs set for all nodes. edge_attr (dict, optional) – Dict of (attribute, value) pairs set for all edges. body (dict, optional) – Dict of (attribute, value) pairs to add to the graph body. A directed graph source code in the DOT language. graphviz.dot.Digraph

Example:

>>> dsp = Dispatcher(name='Dispatcher')
>>> def fun(a):
...     return a + 1, a - 1
>>> dsp.add_function('fun', fun, ['a'], ['b', 'c'])
'fun'
>>> dsp.plot(view=False, graph_attr={'ratio': '1'})
<co2mpas.dispatcher.utils.drw.DspPlot object at 0x...>

web(import_name=None, **options)[source]

Parameters: import_name (str, optional) – The name of the application package. options (dict, optional) – Flask options. Flask app based on the given dispatcher. flask.Flask
celery(import_name=None, **options)[source]

Creates a dispatcher Celery app.

Parameters: import_name (str, optional) – The name of the application package. options (dict, optional) – Flask options. Flask app based on the given dispatcher. celery.Celery
remove_cycles(sources)[source]

Returns a new dispatcher removing unresolved cycles.

An unresolved cycle is a cycle that cannot be removed by the ArciDispatch algorithm.

Parameters: sources (list[str], iterable) – Input data nodes. A new dispatcher without the unresolved cycles. Dispatcher

***********************************************************************

Example:

A dispatcher with an unresolved cycle (i.e., c –> min1 –> d –> min2 –> c):

The dispatch stops on data node c due to the unresolved cycle:

>>> res = dsp.dispatch(inputs={'a': 1})
>>> sorted(res.items())
[('a', 1), ('b', 3)]


Removing the unresolved cycle the dispatch continues to all nodes:

>>> dsp_rm_cy = dsp.remove_cycles(['a', 'b'])
>>> res = dsp_rm_cy.dispatch(inputs={'a': 1})
>>> sorted(res.items())
[('a', 1), ('b', 3), ('c', 3.0), ('d', 1)]

dispatch(inputs=None, outputs=None, cutoff=None, inputs_dist=None, wildcard=False, no_call=False, shrink=False, rm_unused_nds=False, select_output_kw=None, _wait_in=None, stopper=None)[source]

Evaluates the minimum workflow and data outputs of the dispatcher model from given inputs.

Parameters: inputs (dict[str, T], list[str], iterable, optional) – Input data values. outputs (list[str], iterable, optional) – Ending data nodes. cutoff (float, int, optional) – Depth to stop the search. inputs_dist (dict[str, int | float], optional) – Initial distances of input data nodes. wildcard (bool, optional) – If True, when the data node is used as input and target in the ArciDispatch algorithm, the input value will be used as input for the connected functions, but not as output. no_call (bool, optional) – If True data node estimation function is not used and the input values are not used. shrink (bool, optional) – If True the dispatcher is shrink before the dispatch. See also shrink_dsp() rm_unused_nds (bool, optional) – If True unused function and sub-dispatcher nodes are removed from workflow. select_output_kw (dict, optional) – Kwargs of selector function to select specific outputs. _wait_in (dict, optional) – Override wait inputs. stopper (threading.Event, optional) – A semaphore to abort the dispatching. Dictionary of estimated data node outputs. Result[str, T]

***********************************************************************

Example:

A dispatcher with a function $$log(b - a)$$ and two data a and b with default values:

Dispatch without inputs. The default values are used as inputs:

>>> outputs = dsp.dispatch()
...
>>> sorted(outputs.items())
[('a', 0), ('b', 5), ('c', 0), ('d', 1), ('e', 0.0)]


Dispatch until data node c is estimated:

>>> outputs = dsp.dispatch(outputs=['c'])
...
>>> sorted(outputs.items())
[('a', 0), ('b', 5), ('c', 0)]


Dispatch with one inputs. The default value of a is not used as inputs:

>>> outputs = dsp.dispatch(inputs={'a': 3})
...
>>> sorted(outputs.items())
[('a', 3), ('b', 5), ('c', 3), ('d', 1)]

shrink_dsp(inputs=None, outputs=None, cutoff=None, inputs_dist=None, wildcard=True)[source]

Returns a reduced dispatcher.

Parameters: inputs (list[str], iterable, optional) – Input data nodes. outputs (list[str], iterable, optional) – Ending data nodes. cutoff (float, int, optional) – Depth to stop the search. inputs_dist (dict[str, int | float], optional) – Initial distances of input data nodes. wildcard (bool, optional) – If True, when the data node is used as input and target in the ArciDispatch algorithm, the input value will be used as input for the connected functions, but not as output. A sub-dispatcher. Dispatcher

***********************************************************************

Example:

A dispatcher like this:

Get the sub-dispatcher induced by dispatching with no calls from inputs a, b, and c to outputs c, e, and f:

>>> shrink_dsp = dsp.shrink_dsp(inputs=['a', 'b', 'd'],
...                             outputs=['c', 'f'])