http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/workflows.py b/aria/workflows/builtin/workflows.py index b6fbb94..fc54f75 100644 --- a/aria/workflows/builtin/workflows.py +++ b/aria/workflows/builtin/workflows.py @@ -21,6 +21,8 @@ from itertools import groupby from aria import workflow +from ..api import task + __all__ = ( 'install_node_instance', @@ -32,43 +34,43 @@ __all__ = ( # Install node instance workflow and sub workflows @workflow(suffix_template='{node_instance.id}') -def install_node_instance(context, graph, node_instance): +def install_node_instance(ctx, graph, node_instance): """ A workflow which installs a node instance. - :param WorkflowContext context: the workflow context + :param WorkflowContext ctx: the workflow context :param TaskGraph graph: the tasks graph of which to edit :param node_instance: the node instance to install :return: """ - create_node_instance = context.operation( + create_node_instance = task.OperationTask( name='aria.interfaces.lifecycle.create.{0}'.format(node_instance.id), - operation_details=node_instance.node.operations[ - 'aria.interfaces.lifecycle.create'], + operation_details=node_instance.node.operations['aria.interfaces.lifecycle.create'], node_instance=node_instance ) - configure_node_instance = context.operation( + configure_node_instance = task.OperationTask( name='aria.interfaces.lifecycle.configure.{0}'.format(node_instance.id), operation_details=node_instance.node.operations['aria.interfaces.lifecycle.configure'], node_instance=node_instance ) - start_node_instance = context.operation( + start_node_instance = task.OperationTask( name='aria.interfaces.lifecycle.start.{0}'.format(node_instance.id), - operation_details=node_instance.node.operations[ - 'aria.interfaces.lifecycle.start'], + operation_details=node_instance.node.operations['aria.interfaces.lifecycle.start'], node_instance=node_instance ) - graph.chain(tasks=[ + + graph.sequence( create_node_instance, - preconfigure_relationship(context=context, node_instance=node_instance), + preconfigure_relationship(graph, ctx, node_instance), configure_node_instance, - postconfigure_relationship(context=context, node_instance=node_instance), + postconfigure_relationship(graph, ctx, node_instance), start_node_instance, - establish_relationship(context=context, node_instance=node_instance), - ]) + establish_relationship(graph, ctx, node_instance) + ) + return graph -@workflow(suffix_template='{node_instance.id}') -def preconfigure_relationship(context, graph, node_instance): + +def preconfigure_relationship(graph, ctx, node_instance): """ :param context: @@ -76,14 +78,14 @@ def preconfigure_relationship(context, graph, node_instance): :param node_instance: :return: """ - graph.chain(tasks=relationships_tasks( + return relationships_tasks( + graph=graph, operation_name='aria.interfaces.relationship_lifecycle.preconfigure', - context=context, - node_instance=node_instance)) + context=ctx, + node_instance=node_instance) -@workflow(suffix_template='{node_instance.id}') -def postconfigure_relationship(context, graph, node_instance): +def postconfigure_relationship(graph, ctx, node_instance): """ :param context: @@ -91,14 +93,14 @@ def postconfigure_relationship(context, graph, node_instance): :param node_instance: :return: """ - graph.chain(tasks=relationships_tasks( + return relationships_tasks( + graph=graph, operation_name='aria.interfaces.relationship_lifecycle.postconfigure', - context=context, - node_instance=node_instance)) + context=ctx, + node_instance=node_instance) -@workflow(suffix_template='{node_instance.id}') -def establish_relationship(context, graph, node_instance): +def establish_relationship(graph, ctx, node_instance): """ :param context: @@ -106,16 +108,17 @@ def establish_relationship(context, graph, node_instance): :param node_instance: :return: """ - graph.chain(tasks=relationships_tasks( + return relationships_tasks( + graph=graph, operation_name='aria.interfaces.relationship_lifecycle.establish', - context=context, - node_instance=node_instance)) + context=ctx, + node_instance=node_instance) # Uninstall node instance workflow and subworkflows @workflow(suffix_template='{node_instance.id}') -def uninstall_node_instance(graph, context, node_instance): +def uninstall_node_instance(ctx, graph, node_instance): """ A workflow which uninstalls a node instance. :param WorkflowContext context: the workflow context @@ -123,28 +126,25 @@ def uninstall_node_instance(graph, context, node_instance): :param node_instance: the node instance to uninstall :return: """ - stop_node_instance = context.operation( + stop_node_instance = task.OperationTask( name='aria.interfaces.lifecycle.stop.{0}'.format(node_instance.id), - operation_details=node_instance.node.operations[ - 'aria.interfaces.lifecycle.stop'], + operation_details=node_instance.node.operations['aria.interfaces.lifecycle.stop'], node_instance=node_instance ) - delete_node_instance = context.operation( + delete_node_instance = task.OperationTask( name='aria.interfaces.lifecycle.delete.{0}'.format(node_instance.id), - operation_details=node_instance.node.operations[ - 'aria.interfaces.lifecycle.delete'], + operation_details=node_instance.node.operations['aria.interfaces.lifecycle.delete'], node_instance=node_instance ) - graph.chain(tasks=[ + graph.sequence( stop_node_instance, - unlink_relationship(context=context, node_instance=node_instance), - delete_node_instance, - ]) + unlink_relationship(graph, ctx, node_instance), + delete_node_instance + ) -@workflow(suffix_template='{node_instance.id}') -def unlink_relationship(context, graph, node_instance): +def unlink_relationship(graph, ctx, node_instance): """ :param context: @@ -152,19 +152,15 @@ def unlink_relationship(context, graph, node_instance): :param node_instance: :return: """ - tasks = relationships_tasks( + return relationships_tasks( + graph=graph, operation_name='aria.interfaces.relationship_lifecycle.unlink', - context=context, + context=ctx, node_instance=node_instance ) - graph.chain(tasks=tasks) - return tasks -@workflow(suffix_template='{node_instance.id}.{operation}') def execute_operation_on_instance( - context, - graph, node_instance, operation, operation_kwargs, @@ -187,16 +183,18 @@ def execute_operation_on_instance( node_instance=node_instance, operation_name=operation) - graph.add_task( - context.operation( - name=task_name, - operation_details=node_instance.node.operations[operation], - node_instance=node_instance, - parameters=operation_kwargs) - ) + return task.OperationTask( + name=task_name, + operation_details=node_instance.node.operations[operation], + node_instance=node_instance, + inputs=operation_kwargs) -def relationships_tasks(operation_name, context, node_instance): + +def relationships_tasks(graph, + operation_name, + context, + node_instance): """ Creates a relationship task (source and target) for all of a node_instance relationships. :param basestring operation_name: the relationship operation name. @@ -211,17 +209,24 @@ def relationships_tasks(operation_name, context, node_instance): sub_tasks = [] for index, (_, relationship_group) in enumerate(relationships_groups): for relationship_instance in relationship_group: - relationship_subgraph = relationship_tasks( + relationship_operations = relationship_tasks( + graph=graph, node_instance=node_instance, relationship_instance=relationship_instance, context=context, operation_name=operation_name, index=index) - sub_tasks.append(relationship_subgraph) - return sub_tasks + sub_tasks.append(relationship_operations) + + return graph.sequence(*sub_tasks) -def relationship_tasks(node_instance, relationship_instance, context, operation_name, index=None): +def relationship_tasks(graph, + node_instance, + relationship_instance, + context, + operation_name, + index=None): """ Creates a relationship task source and target. :param NodeInstance node_instance: the node instance of the relationship @@ -232,29 +237,20 @@ def relationship_tasks(node_instance, relationship_instance, context, operation_ :return: """ index = index or node_instance.relationship_instances.index(relationship_instance) - sub_workflow_name = '{name}.{index}.{node_instance.id}'.format( - name=operation_name, - index=index, - node_instance=node_instance, - ) operation_name_template = '{name}.{index}.{{0}}.<{source_id}, {target_id}>'.format( name=operation_name, index=index, source_id=node_instance.id, target_id=relationship_instance.target_id, ) - source_operation = context.operation( + source_operation = task.OperationTask( name=operation_name_template.format('source'), node_instance=node_instance, operation_details=relationship_instance.relationship.source_operations[ operation_name]) - target_operation = context.operation( + target_operation = task.OperationTask( name=operation_name_template.format('target'), - node_instance=context.storage.node_instance.get( - relationship_instance.target_id), + node_instance=context.model.node_instance.get(relationship_instance.target_id), operation_details=relationship_instance.relationship.target_operations[ operation_name]) - sub_graph = context.task_graph(name=sub_workflow_name) - sub_graph.add_task(source_operation) - sub_graph.add_task(target_operation) - return sub_graph + return graph.add_tasks(source_operation, target_operation)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/core/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/__init__.py b/aria/workflows/core/__init__.py index ae1e83e..646f44a 100644 --- a/aria/workflows/core/__init__.py +++ b/aria/workflows/core/__init__.py @@ -12,3 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +""" +Core for the workflow execution mechanism +""" + +from . import task http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py index a00bc84..a288757 100644 --- a/aria/workflows/core/engine.py +++ b/aria/workflows/core/engine.py @@ -22,12 +22,11 @@ from datetime import datetime import networkx -from aria import events, logger -from aria.storage import models - +from ... import events, logger +from ...storage import models from .. import exceptions from . import translation -from . import tasks +from . import task as engine_task class Engine(logger.LoggerMixin): @@ -41,7 +40,6 @@ class Engine(logger.LoggerMixin): self._execution_graph = networkx.DiGraph() self._executor = executor translation.build_execution_graph(task_graph=tasks_graph, - workflow_context=workflow_context, execution_graph=self._execution_graph) def execute(self): @@ -67,13 +65,12 @@ class Engine(logger.LoggerMixin): def _executable_tasks(self): now = datetime.now() return (task for task in self._tasks_iter() - if task.status == models.Operation.PENDING and + if task.status == models.Task.PENDING and task.eta <= now and not self._task_has_dependencies(task)) def _ended_tasks(self): - return (task for task in self._tasks_iter() - if task.status in models.Operation.END_STATES) + return (task for task in self._tasks_iter() if task.status in models.Task.END_STATES) def _task_has_dependencies(self, task): return len(self._execution_graph.pred.get(task.id, {})) > 0 @@ -85,14 +82,14 @@ class Engine(logger.LoggerMixin): return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True)) def _handle_executable_task(self, task): - if isinstance(task, tasks.BaseWorkflowTask): - task.status = models.Operation.SUCCESS + if isinstance(task, engine_task.BaseWorkflowTask): + task.status = models.Task.SUCCESS else: events.sent_task_signal.send(task) self._executor.execute(task) def _handle_ended_tasks(self, task): - if task.status == models.Operation.FAILED: + if task.status == models.Task.FAILED: raise exceptions.ExecutorException('Workflow failed') else: self._execution_graph.remove_node(task.id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py new file mode 100644 index 0000000..fc72b59 --- /dev/null +++ b/aria/workflows/core/task.py @@ -0,0 +1,200 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow tasks +""" +from contextlib import contextmanager +from datetime import datetime + +from ... import logger +from ...storage import models +from .. import exceptions + + +class BaseTask(logger.LoggerMixin): + """ + Base class for Task objects + """ + + def __init__(self, id, *args, **kwargs): + super(BaseTask, self).__init__(*args, **kwargs) + self._id = id + + @property + def id(self): + """ + :return: the task's id + """ + return self._id + + +class BaseWorkflowTask(BaseTask): + """ + Base class for all workflow wrapping tasks + """ + + def __init__(self, *args, **kwargs): + super(BaseWorkflowTask, self).__init__(*args, **kwargs) + self.status = models.Task.PENDING + self.eta = datetime.now() + + +class StartWorkflowTask(BaseWorkflowTask): + """ + Tasks marking a workflow start + """ + pass + + +class EndWorkflowTask(BaseWorkflowTask): + """ + Tasks marking a workflow end + """ + pass + + +class StartSubWorkflowTask(BaseWorkflowTask): + """ + Tasks marking a subworkflow start + """ + pass + + +class EndSubWorkflowTask(BaseWorkflowTask): + """ + Tasks marking a subworkflow end + """ + pass + + +class OperationTask(BaseTask, logger.LoggerMixin): + """ + Operation tasks + """ + + def __init__(self, api_task, *args, **kwargs): + super(OperationTask, self).__init__(id=api_task.id, **kwargs) + self._workflow_ctx = api_task.workflow_context + task_model = api_task.workflow_context.model.task.model_cls + task = task_model( + name=api_task.name, + operation_details=api_task.operation_details, + node_instance=api_task.node_instance, + inputs=api_task.inputs, + status=task_model.PENDING, + execution_id=self.workflow_context.execution_id, + max_retries=self.workflow_context.parameters.get('max_retries', 1), + ) + self.workflow_context.model.task.store(task) + self._task_id = task.id + self._update_fields = None + + @contextmanager + def update(self): + """ + A context manager which puts the task into update mode, enabling fields update. + :yields: None + """ + self._update_fields = {} + try: + yield + task = self.context + for key, value in self._update_fields.items(): + setattr(task, key, value) + self.context = task + finally: + self._update_fields = None + + @property + def workflow_context(self): + """ + :return: the task's name + """ + return self._workflow_ctx + + @property + def context(self): + """ + Returns the task model in storage + :return: task in storage + """ + return self.workflow_context.model.task.get(self._task_id) + + @context.setter + def context(self, value): + self.workflow_context.model.task.store(value) + + @property + def status(self): + """ + Returns the task status + :return: task status + """ + return self.context.status + + @status.setter + def status(self, value): + if self._update_fields is None: + raise exceptions.TaskException("Task is not in update mode") + self._update_fields['status'] = value + + @property + def started_at(self): + """ + Returns when the task started + :return: when task started + """ + return self.context.started_at + + @started_at.setter + def started_at(self, value): + if self._update_fields is None: + raise exceptions.TaskException("Task is not in update mode") + self._update_fields['started_at'] = value + + @property + def ended_at(self): + """ + Returns when the task ended + :return: when task ended + """ + return self.context.ended_at + + @ended_at.setter + def ended_at(self, value): + if self._update_fields is None: + raise exceptions.TaskException("Task is not in update mode") + self._update_fields['ended_at'] = value + + @property + def retry_count(self): + """ + Returns the retry count for the task + :return: retry count + """ + return self.context.retry_count + + @retry_count.setter + def retry_count(self, value): + if self._update_fields is None: + raise exceptions.TaskException("Task is not in update mode") + self._update_fields['retry_count'] = value + + def __getattr__(self, attr): + try: + return getattr(self.context, attr) + except AttributeError: + return super(OperationTask, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/core/tasks.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/tasks.py b/aria/workflows/core/tasks.py deleted file mode 100644 index 98d7c13..0000000 --- a/aria/workflows/core/tasks.py +++ /dev/null @@ -1,121 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow tasks -""" - -from datetime import datetime - -from aria import logger -from aria.storage import models - - -class BaseTask(logger.LoggerMixin): - """ - Base class for Task objects - """ - - def __init__(self, id, name, context, *args, **kwargs): - super(BaseTask, self).__init__(*args, **kwargs) - self._id = id - self._name = name - self._context = context - - @property - def id(self): - """ - :return: the task's id - """ - return self._id - - @property - def name(self): - """ - :return: the task's name - """ - return self._name - - @property - def context(self): - """ - :return: the task's context - """ - return self._context - - -class BaseWorkflowTask(BaseTask): - """ - Base class for all workflow wrapping tasks - """ - - def __init__(self, *args, **kwargs): - super(BaseWorkflowTask, self).__init__(*args, **kwargs) - self.status = models.Operation.PENDING - self.eta = datetime.now() - - -class StartWorkflowTask(BaseWorkflowTask): - """ - Tasks marking a workflow start - """ - pass - - -class EndWorkflowTask(BaseWorkflowTask): - """ - Tasks marking a workflow end - """ - pass - - -class StartSubWorkflowTask(BaseWorkflowTask): - """ - Tasks marking a subworkflow start - """ - pass - - -class EndSubWorkflowTask(BaseWorkflowTask): - """ - Tasks marking a subworkflow end - """ - pass - - -class OperationTask(BaseTask): - """ - Operation tasks - """ - - def __init__(self, *args, **kwargs): - super(OperationTask, self).__init__(*args, **kwargs) - self._create_operation_in_storage() - - def _create_operation_in_storage(self): - operation_cls = self.context.model.operation.model_cls - operation = operation_cls( - id=self.context.id, - execution_id=self.context.execution_id, - max_retries=self.context.parameters.get('max_retries', 1), - status=operation_cls.PENDING, - ) - self.context.operation = operation - - def __getattr__(self, attr): - try: - return getattr(self.context.operation, attr) - except AttributeError: - return super(OperationTask, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/translation.py b/aria/workflows/core/translation.py index dc483c6..cd9a0e6 100644 --- a/aria/workflows/core/translation.py +++ b/aria/workflows/core/translation.py @@ -17,17 +17,15 @@ Translation of user graph's API to the execution graph """ -from aria import contexts - -from . import tasks +from . import task as core_task +from .. import api def build_execution_graph( task_graph, - workflow_context, execution_graph, - start_cls=tasks.StartWorkflowTask, - end_cls=tasks.EndWorkflowTask, + start_cls=core_task.StartWorkflowTask, + end_cls=core_task.EndWorkflowTask, depends_on=()): """ Translates the user graph to the execution graph @@ -39,43 +37,36 @@ def build_execution_graph( :param depends_on: internal use """ # Insert start marker - start_task = start_cls(id=_start_graph_suffix(task_graph.id), - name=_start_graph_suffix(task_graph.name), - context=workflow_context) + start_task = start_cls(id=_start_graph_suffix(task_graph.id)) _add_task_and_dependencies(execution_graph, start_task, depends_on) - for operation_or_workflow, dependencies in task_graph.task_tree(reverse=True): + for api_task in task_graph.topological_order(reverse=True): + dependencies = task_graph.get_dependencies(api_task) operation_dependencies = _get_tasks_from_dependencies( execution_graph, dependencies, default=[start_task]) - if _is_operation(operation_or_workflow): + if _is_operation(api_task): # Add the task an the dependencies - operation_task = tasks.OperationTask(id=operation_or_workflow.id, - name=operation_or_workflow.name, - context=operation_or_workflow) + operation_task = core_task.OperationTask(api_task) _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) else: # Built the graph recursively while adding start and end markers build_execution_graph( - task_graph=operation_or_workflow, - workflow_context=workflow_context, + task_graph=api_task, execution_graph=execution_graph, - start_cls=tasks.StartSubWorkflowTask, - end_cls=tasks.EndSubWorkflowTask, + start_cls=core_task.StartSubWorkflowTask, + end_cls=core_task.EndSubWorkflowTask, depends_on=operation_dependencies ) # Insert end marker workflow_dependencies = _get_tasks_from_dependencies( execution_graph, - task_graph.leaf_tasks, + _get_non_dependency_tasks(task_graph), default=[start_task]) - end_task = end_cls( - id=_end_graph_suffix(task_graph.id), - name=_end_graph_suffix(task_graph.name), - context=workflow_context) + end_task = end_cls(id=_end_graph_suffix(task_graph.id)) _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies) @@ -95,7 +86,7 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): def _is_operation(task): - return isinstance(task, contexts.OperationContext) + return isinstance(task, api.task.OperationTask) def _start_graph_suffix(id): @@ -104,3 +95,9 @@ def _start_graph_suffix(id): def _end_graph_suffix(id): return '{0}-End'.format(id) + + +def _get_non_dependency_tasks(graph): + for task in graph.tasks: + if len(list(graph.get_dependents(task))) == 0: + yield task http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/workflows/exceptions.py b/aria/workflows/exceptions.py index b8ebc14..d7b189d 100644 --- a/aria/workflows/exceptions.py +++ b/aria/workflows/exceptions.py @@ -61,3 +61,10 @@ class AriaEngineError(Exception): """ Raised by the workflow engine """ + + +class TaskException(Exception): + """ + Raised by the task + """ + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/executor/blocking.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/blocking.py b/aria/workflows/executor/blocking.py index 86171ba..f072d8a 100644 --- a/aria/workflows/executor/blocking.py +++ b/aria/workflows/executor/blocking.py @@ -29,9 +29,8 @@ class CurrentThreadBlockingExecutor(BaseExecutor): def execute(self, task): self._task_started(task) try: - operation_context = task.context - task_func = module.load_attribute(operation_context.operation_details['operation']) - task_func(**operation_context.inputs) + task_func = module.load_attribute(task.operation_details['operation']) + task_func(**task.inputs) self._task_succeeded(task) except BaseException as e: self._task_failed(task, exception=e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/celery.py b/aria/workflows/executor/celery.py index 2d486f2..a82a6b7 100644 --- a/aria/workflows/executor/celery.py +++ b/aria/workflows/executor/celery.py @@ -43,11 +43,10 @@ class CeleryExecutor(BaseExecutor): self._started_queue.get(timeout=30) def execute(self, task): - operation_context = task.context self._tasks[task.id] = task self._results[task.id] = self._app.send_task( - operation_context.operation_details['operation'], - kwargs=operation_context.inputs, + task.operation_details['operation'], + kwargs=task.inputs, task_id=task.id, queue=self._get_queue(task)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/executor/multiprocess.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/multiprocess.py b/aria/workflows/executor/multiprocess.py index e6faf5f..4af08c0 100644 --- a/aria/workflows/executor/multiprocess.py +++ b/aria/workflows/executor/multiprocess.py @@ -40,16 +40,15 @@ class MultiprocessExecutor(BaseExecutor): self._listener_thread = threading.Thread(target=self._listener) self._listener_thread.daemon = True self._listener_thread.start() - self._pool = multiprocessing.Pool(processes=pool_size, - maxtasksperchild=1) + self._pool = multiprocessing.Pool(processes=pool_size) def execute(self, task): self._tasks[task.id] = task self._pool.apply_async(_multiprocess_handler, args=( self._queue, task.id, - task.context.operation_details, - task.context.inputs)) + task.operation_details, + task.inputs)) def close(self): self._pool.close() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/thread.py b/aria/workflows/executor/thread.py index dfc0f18..180c482 100644 --- a/aria/workflows/executor/thread.py +++ b/aria/workflows/executor/thread.py @@ -55,10 +55,8 @@ class ThreadExecutor(BaseExecutor): task = self._queue.get(timeout=1) self._task_started(task) try: - operation_context = task.context - task_func = module.load_attribute( - operation_context.operation_details['operation']) - task_func(**operation_context.inputs) + task_func = module.load_attribute(task.operation_details['operation']) + task_func(**task.inputs) self._task_succeeded(task) except BaseException as e: self._task_failed(task, exception=e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/requirements.txt ---------------------------------------------------------------------- diff --git a/requirements.txt b/requirements.txt index 1240b72..b550a58 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,6 @@ networkx==1.9 requests==2.7.0 retrying==1.3.3 blinker==1.4 +importlib==1.0.4 ; python_version < '2.7' +ordereddict==1.1 ; python_version < '2.7' jsonpickle http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 811810b..6d7a30c 100644 --- a/setup.py +++ b/setup.py @@ -36,10 +36,6 @@ try: except IOError: install_requires = [] -try: - import importlib -except ImportError: - install_requires.append('importlib') setup( name=_PACKAGE_NAME, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/.pylintrc ---------------------------------------------------------------------- diff --git a/tests/.pylintrc b/tests/.pylintrc index 0f84473..c455d8a 100644 --- a/tests/.pylintrc +++ b/tests/.pylintrc @@ -1,3 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + [MASTER] # Python code to execute, usually for sys.path manipulation such as @@ -62,7 +77,7 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" -disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,no-self-use,missing-docstring,attribute-defined-outside-init,too-many-locals +disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,no-self-use,missing-docstring,attribute-defined-outside-init,redefined-outer-name,import-error,too-many-locals [REPORTS] @@ -339,7 +354,7 @@ max-args=15 ignored-argument-names=_.* # Maximum number of locals for function / method body -max-locals=15 +max-locals=30 # Maximum number of return / yield for function / method body max-returns=6 @@ -360,7 +375,7 @@ max-attributes=15 min-public-methods=0 # Maximum number of public methods for a class (see R0904). -max-public-methods=20 +max-public-methods=50 # Maximum number of boolean expressions in a if statement max-bool-expr=5 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/mock/__init__.py ---------------------------------------------------------------------- diff --git a/tests/mock/__init__.py b/tests/mock/__init__.py new file mode 100644 index 0000000..14541d0 --- /dev/null +++ b/tests/mock/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import models, context, operations http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py new file mode 100644 index 0000000..a89612e --- /dev/null +++ b/tests/mock/context.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from aria import context, application_model_storage + +from . import models +from ..storage import InMemoryModelDriver + + +def simple(): + storage = application_model_storage(InMemoryModelDriver()) + storage.setup() + return context.workflow.WorkflowContext( + name='simple_context', + model_storage=storage, + resource_storage=None, + deployment_id=models.DEPLOYMENT_ID, + workflow_id=models.WORKFLOW_ID, + execution_id=models.EXECUTION_ID + ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py new file mode 100644 index 0000000..633adbb --- /dev/null +++ b/tests/mock/models.py @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +from aria.storage import models + +from . import operations + +DEPLOYMENT_ID = 'test_deployment_id' +BLUEPRINT_ID = 'test_blueprint_id' +WORKFLOW_ID = 'test_workflow_id' +EXECUTION_ID = 'test_execution_id' + + +def get_dependency_node(): + return models.Node( + id='dependency_node', + host_id='dependency_node', + blueprint_id=BLUEPRINT_ID, + type='test_node_type', + type_hierarchy=[], + number_of_instances=1, + planned_number_of_instances=1, + deploy_number_of_instances=1, + properties={}, + operations=dict((key, {}) for key in operations.NODE_OPERATIONS), + relationships=[], + min_number_of_instances=1, + max_number_of_instances=1, + ) + + +def get_dependency_node_instance(dependency_node=None): + return models.NodeInstance( + id='dependency_node_instance', + host_id='dependency_node_instance', + deployment_id=DEPLOYMENT_ID, + runtime_properties={}, + version=None, + relationship_instances=[], + node=dependency_node or get_dependency_node() + ) + + +def get_relationship(target=None): + return models.Relationship( + target_id=target.id or get_dependency_node().id, + source_interfaces={}, + source_operations=dict((key, {}) for key in operations.RELATIONSHIP_OPERATIONS), + target_interfaces={}, + target_operations=dict((key, {}) for key in operations.RELATIONSHIP_OPERATIONS), + type='rel_type', + type_hierarchy=[], + properties={}, + ) + + +def get_relationship_instance(target_instance=None, relationship=None): + return models.RelationshipInstance( + target_id=target_instance.id or get_dependency_node_instance().id, + target_name='test_target_name', + type='some_type', + relationship=relationship or get_relationship(target_instance.node + if target_instance else None) + ) + + +def get_dependent_node(relationship=None): + return models.Node( + id='dependent_node', + host_id='dependent_node', + blueprint_id=BLUEPRINT_ID, + type='test_node_type', + type_hierarchy=[], + number_of_instances=1, + planned_number_of_instances=1, + deploy_number_of_instances=1, + properties={}, + operations=dict((key, {}) for key in operations.NODE_OPERATIONS), + relationships=[relationship or get_relationship()], + min_number_of_instances=1, + max_number_of_instances=1, + ) + + +def get_dependent_node_instance(relationship_instance, dependent_node=None): + return models.NodeInstance( + id='dependent_node_instance', + host_id='dependent_node_instance', + deployment_id=DEPLOYMENT_ID, + runtime_properties={}, + version=None, + relationship_instances=[relationship_instance or get_relationship_instance()], + node=dependent_node or get_dependency_node() + ) + + +def get_execution(): + return models.Execution( + id=EXECUTION_ID, + status=models.Execution.STARTED, + deployment_id=DEPLOYMENT_ID, + workflow_id=WORKFLOW_ID, + blueprint_id=BLUEPRINT_ID, + started_at=datetime.now(), + parameters=None + ) + + +def get_deployment(): + now = datetime.now() + return models.Deployment( + id=DEPLOYMENT_ID, + description=None, + created_at=now, + updated_at=now, + blueprint_id=BLUEPRINT_ID, + workflows={} + ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/mock/operations.py ---------------------------------------------------------------------- diff --git a/tests/mock/operations.py b/tests/mock/operations.py new file mode 100644 index 0000000..407061f --- /dev/null +++ b/tests/mock/operations.py @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +NODE_OPERATIONS_INSTALL = [ + 'aria.interfaces.lifecycle.create', + 'aria.interfaces.lifecycle.configure', + 'aria.interfaces.lifecycle.start', + ] +NODE_OPERATIONS_UNINSTALL = [ + 'aria.interfaces.lifecycle.stop', + 'aria.interfaces.lifecycle.delete', +] +NODE_OPERATIONS = NODE_OPERATIONS_INSTALL + NODE_OPERATIONS_UNINSTALL + +RELATIONSHIP_OPERATIONS_INSTALL = [ + 'aria.interfaces.relationship_lifecycle.preconfigure', + 'aria.interfaces.relationship_lifecycle.postconfigure', + 'aria.interfaces.relationship_lifecycle.establish', + ] +RELATIONSHIP_OPERATIONS_UNINSTALL = ['aria.interfaces.relationship_lifecycle.unlink'] +RELATIONSHIP_OPERATIONS = RELATIONSHIP_OPERATIONS_INSTALL + RELATIONSHIP_OPERATIONS_UNINSTALL http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/test_logger.py ---------------------------------------------------------------------- diff --git a/tests/test_logger.py b/tests/test_logger.py index 7891dd6..37731bb 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -80,7 +80,7 @@ def test_create_file_log_handler(): assert handler.baseFilename == temp_file.name assert handler.maxBytes == 5 * 1000 * 1024 assert handler.backupCount == 10 - assert handler.delay is True + assert handler.stream is None assert handler.level == logging.DEBUG assert handler.formatter == _default_file_formatter http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/__init__.py ---------------------------------------------------------------------- diff --git a/tests/workflows/__init__.py b/tests/workflows/__init__.py index ae1e83e..fe04b2f 100644 --- a/tests/workflows/__init__.py +++ b/tests/workflows/__init__.py @@ -12,3 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from . import api, builtin, core http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/api/__init__.py ---------------------------------------------------------------------- diff --git a/tests/workflows/api/__init__.py b/tests/workflows/api/__init__.py new file mode 100644 index 0000000..09697dc --- /dev/null +++ b/tests/workflows/api/__init__.py @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/api/test_task.py ---------------------------------------------------------------------- diff --git a/tests/workflows/api/test_task.py b/tests/workflows/api/test_task.py new file mode 100644 index 0000000..7119529 --- /dev/null +++ b/tests/workflows/api/test_task.py @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pytest + +from aria import context +from aria.workflows import api + +from ... import mock + + [email protected]() +def ctx(): + """ + Create the following graph in storage: + dependency_node <------ dependent_node + :return: + """ + simple_context = mock.context.simple() + dependency_node = mock.models.get_dependency_node() + dependency_node_instance = mock.models.get_dependency_node_instance( + dependency_node=dependency_node) + + relationship = mock.models.get_relationship(dependency_node) + relationship_instance = mock.models.get_relationship_instance( + relationship=relationship, + target_instance=dependency_node_instance + ) + + dependent_node = mock.models.get_dependent_node(relationship) + dependent_node_instance = mock.models.get_dependent_node_instance( + dependent_node=dependent_node, + relationship_instance=relationship_instance + ) + + simple_context.model.node.store(dependent_node) + simple_context.model.node.store(dependency_node) + simple_context.model.node_instance.store(dependent_node_instance) + simple_context.model.node_instance.store(dependency_node_instance) + simple_context.model.relationship.store(relationship) + simple_context.model.relationship_instance.store(relationship_instance) + simple_context.model.execution.store(mock.models.get_execution()) + simple_context.model.deployment.store(mock.models.get_deployment()) + + return simple_context + + +class TestOperationTask(object): + + def test_operation_task_creation(self): + workflow_context = mock.context.simple() + + name = 'task_name' + op_details = {'operation_details': True} + node_instance = mock.models.get_dependency_node_instance() + inputs = {'inputs': True} + + with context.workflow.current.push(workflow_context): + model_task = api.task.OperationTask(name=name, + operation_details=op_details, + node_instance=node_instance, + inputs=inputs) + + assert model_task.name == name + assert model_task.operation_details == op_details + assert model_task.node_instance == node_instance + assert model_task.inputs == inputs + + +class TestWorkflowTask(object): + + def test_workflow_task_creation(self, ctx): + + workspace = {} + + mock_class = type('mock_class', (object,), {'test_attribute': True}) + + def sub_workflow(**kwargs): + workspace.update(kwargs) + return mock_class + + with context.workflow.current.push(ctx): + workflow_task = api.task.WorkflowTask(sub_workflow, kwarg='workflow_kwarg') + assert workflow_task.graph is mock_class + assert workflow_task.test_attribute is True http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/api/test_task_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/api/test_task_graph.py b/tests/workflows/api/test_task_graph.py new file mode 100644 index 0000000..0ec0b0e --- /dev/null +++ b/tests/workflows/api/test_task_graph.py @@ -0,0 +1,745 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.workflows.api import task_graph, task + + +class MockTask(task.BaseTask): + def __init__(self): + super(MockTask, self).__init__(ctx={}) + + [email protected] +def graph(): + return task_graph.TaskGraph(name='mock-graph') + + +class TestTaskGraphTasks(object): + + def test_add_task(self, graph): + task = MockTask() + add_result = graph.add_tasks(task) + assert add_result == [task] + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_add_empty_group(self, graph): + result = graph.add_tasks([]) + assert result == [] + + def test_add_group(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + added_tasks = graph.add_tasks(*tasks) + assert added_tasks == tasks + + def test_add_partially_existing_group(self, graph): + task = MockTask() + graph.add_tasks(task) + tasks = [MockTask(), task, MockTask()] + added_tasks = graph.add_tasks(*tasks) + assert added_tasks == [tasks[0], tasks[2]] + + def test_add_recursively_group(self, graph): + recursive_group = [MockTask(), MockTask()] + tasks = [MockTask(), recursive_group, MockTask()] + added_tasks = graph.add_tasks(tasks) + assert added_tasks == [tasks[0], recursive_group[0], recursive_group[1], tasks[2]] + + def test_add_existing_task(self, graph): + task = MockTask() + graph.add_tasks(task) + # adding a task already in graph - should have no effect, and return False + add_result = graph.add_tasks(task) + assert add_result == [] + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_remove_task(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + graph.remove_tasks(other_task) + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_remove_tasks_with_dependency(self, graph): + task = MockTask() + dependent_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependent_task) + graph.add_dependency(dependent_task, task) + remove_result = graph.remove_tasks(dependent_task) + assert remove_result == [dependent_task] + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + # asserting no dependencies are left for the dependent task + assert len(list(graph.get_dependencies(task))) == 0 + + def test_remove_empty_group(self, graph): + result = graph.remove_tasks([]) + assert result == [] + + def test_remove_group(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + graph.add_tasks(*tasks) + removed_tasks = graph.remove_tasks(*tasks) + assert removed_tasks == tasks + + def test_remove_partially_existing_group(self, graph): + task = MockTask() + graph.add_tasks(task) + tasks = [MockTask(), task, MockTask()] + removed_tasks = graph.remove_tasks(*tasks) + assert removed_tasks == [task] + + def test_remove_recursively_group(self, graph): + recursive_group = [MockTask(), MockTask()] + tasks = [MockTask(), recursive_group, MockTask()] + graph.add_tasks(tasks) + removed_tasks = graph.remove_tasks(tasks) + assert removed_tasks == [tasks[0], recursive_group[0], recursive_group[1], tasks[2]] + + def test_remove_nonexistent_task(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + # removing a task not in graph - should have no effect, and return False + remove_result = graph.remove_tasks(task_not_in_graph) + assert remove_result == [] + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_has_task(self, graph): + task = MockTask() + graph.add_tasks(task) + assert graph.has_tasks(task) is True + + def test_has_nonexistent_task(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + assert graph.has_tasks(task_not_in_graph) is False + + def test_has_empty_group(self, graph): + # the "empty task" is in the graph + assert graph.has_tasks([]) is True + + def test_has_group(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + graph.add_tasks(*tasks) + assert graph.has_tasks(*tasks) is True + + def test_has_partially_existing_group(self, graph): + task = MockTask() + graph.add_tasks(task) + tasks = [MockTask(), task, MockTask()] + assert graph.has_tasks(tasks) is False + + def test_has_recursively_group(self, graph): + recursive_group = [MockTask(), MockTask()] + tasks = [MockTask(), recursive_group, MockTask()] + graph.add_tasks(tasks) + assert graph.has_tasks(tasks) is True + + def test_get_task(self, graph): + task = MockTask() + graph.add_tasks(task) + assert graph.get_task(task.id) == task + + def test_get_nonexistent_task(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.get_task(task_not_in_graph.id) + + +class TestTaskGraphGraphTraversal(object): + + def test_tasks_iteration(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + tasks = [t for t in graph.tasks] + assert set(tasks) == set([task, other_task]) + + def test_get_dependents(self, graph): + task = MockTask() + dependent_task_1 = MockTask() + dependent_task_2 = MockTask() + transitively_dependent_task = MockTask() + + graph.add_tasks(task) + graph.add_tasks(dependent_task_1) + graph.add_tasks(dependent_task_2) + graph.add_tasks(transitively_dependent_task) + + graph.add_dependency(dependent_task_1, task) + graph.add_dependency(dependent_task_2, task) + graph.add_dependency(transitively_dependent_task, dependent_task_2) + + dependent_tasks = list(graph.get_dependents(task)) + # transitively_dependent_task not expected to appear in the result + assert set(dependent_tasks) == set([dependent_task_1, dependent_task_2]) + + def test_get_task_empty_dependents(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + dependent_tasks = list(graph.get_dependents(task)) + assert len(dependent_tasks) == 0 + + def test_get_nonexistent_task_dependents(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + list(graph.get_dependents(task_not_in_graph)) + + def test_get_dependencies(self, graph): + task = MockTask() + dependency_task_1 = MockTask() + dependency_task_2 = MockTask() + transitively_dependency_task = MockTask() + + graph.add_tasks(task) + graph.add_tasks(dependency_task_1) + graph.add_tasks(dependency_task_2) + graph.add_tasks(transitively_dependency_task) + + graph.add_dependency(task, dependency_task_1) + graph.add_dependency(task, dependency_task_2) + graph.add_dependency(dependency_task_2, transitively_dependency_task) + + dependency_tasks = list(graph.get_dependencies(task)) + # transitively_dependency_task not expected to appear in the result + assert set(dependency_tasks) == set([dependency_task_1, dependency_task_2]) + + def test_get_task_empty_dependencies(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + dependency_tasks = list(graph.get_dependencies(task)) + assert len(dependency_tasks) == 0 + + def test_get_nonexistent_task_dependencies(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + list(graph.get_dependencies(task_not_in_graph)) + + +class TestTaskGraphDependencies(object): + + def test_add_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + unrelated_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + graph.add_tasks(unrelated_task) + graph.add_dependency(task, dependency_task) + add_result = graph.has_dependency(task, dependency_task) + assert add_result is True + dependency_tasks = list(graph.get_dependencies(task)) + assert len(dependency_tasks) == 1 + assert dependency_tasks[0] == dependency_task + + def test_add_existing_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + graph.add_dependency(task, dependency_task) + add_result = graph.has_dependency(task, dependency_task) + # adding a dependency already in graph - should have no effect, and return False + assert add_result is True + graph.add_dependency(task, dependency_task) + add_result = graph.has_dependency(task, dependency_task) + assert add_result is True + dependency_tasks = list(graph.get_dependencies(task)) + assert len(dependency_tasks) == 1 + assert dependency_tasks[0] == dependency_task + + def test_add_dependency_nonexistent_dependent(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.add_dependency(task_not_in_graph, task) + + def test_add_dependency_nonexistent_dependency(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.add_dependency(task, task_not_in_graph) + + def test_add_dependency_empty_dependent(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting add_dependency result to be False - no dependency has been created + assert set(graph.tasks) == set((task,)) + + def test_add_dependency_empty_dependency(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting add_dependency result to be False - no dependency has been created + assert set(graph.tasks) == set((task,)) + + def test_add_dependency_dependent_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(group_tasks, task) + assert graph.has_dependency(group_tasks[0], task) is True + assert graph.has_dependency(group_tasks[1], task) is True + assert graph.has_dependency(group_tasks[2], task) is True + + def test_add_dependency_dependency_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(task, group_tasks) + assert graph.has_dependency(task, group_tasks[0]) is True + assert graph.has_dependency(task, group_tasks[1]) is True + assert graph.has_dependency(task, group_tasks[2]) is True + + def test_add_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + graph.add_dependency(group_1_tasks, group_2_tasks) + for group_2_task in group_2_tasks: + assert graph.has_dependency(group_1_tasks[0], group_2_task) is True + assert graph.has_dependency(group_1_tasks[1], group_2_task) is True + assert graph.has_dependency(group_1_tasks[2], group_2_task) is True + + def test_add_dependency_dependency_group_with_some_existing_dependencies(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + # adding a dependency on a specific task manually, + # before adding a dependency on the whole parallel + graph.add_dependency(task, group_tasks[1]) + graph.add_dependency(task, group_tasks) + assert graph.has_dependency(task, group_tasks[0]) is True + assert graph.has_dependency(task, group_tasks[1]) is True + assert graph.has_dependency(task, group_tasks[2]) is True + + def test_add_existing_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + graph.add_dependency(group_1_tasks, group_2_tasks) + add_result = graph.has_dependency(group_1_tasks, group_2_tasks) + assert add_result is True + # adding a dependency already in graph - should have no effect, and return False + graph.add_dependency(group_1_tasks, group_2_tasks) + add_result = graph.has_dependency(group_1_tasks, group_2_tasks) + assert add_result is True + for group_2_task in group_2_tasks: + assert graph.has_dependency(group_1_tasks[0], group_2_task) is True + assert graph.has_dependency(group_1_tasks[1], group_2_task) is True + assert graph.has_dependency(group_1_tasks[2], group_2_task) is True + + def test_has_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + graph.add_dependency(task, dependency_task) + assert graph.has_dependency(task, dependency_task) is True + + def test_has_nonexistent_dependency(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + assert graph.has_dependency(task, other_task) is False + + def test_has_dependency_nonexistent_dependent(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.has_dependency(task_not_in_graph, task) + + def test_has_dependency_nonexistent_dependency(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.has_dependency(task, task_not_in_graph) + + def test_has_dependency_empty_dependent(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting has_dependency result to be False - dependency in an empty form + assert graph.has_dependency([], task) is False + + def test_has_dependency_empty_dependency(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting has_dependency result to be True - dependency in an empty form + assert graph.has_dependency(task, []) is False + + def test_has_dependency_dependent_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + assert graph.has_dependency(group_tasks, task) is False + graph.add_dependency(group_tasks, task) + assert graph.has_dependency(group_tasks, task) is True + + def test_has_dependency_dependency_parallel(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + assert graph.has_dependency(task, group_tasks) is False + graph.add_dependency(task, group_tasks) + assert graph.has_dependency(task, group_tasks) is True + + def test_has_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + assert graph.has_dependency(group_2_tasks, group_1_tasks) is False + graph.add_dependency(group_2_tasks, group_1_tasks) + assert graph.has_dependency(group_2_tasks, group_1_tasks) is True + + def test_has_dependency_dependency_parallel_with_some_existing_dependencies(self, graph): + task = MockTask() + parallel_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + parallel = graph.add_tasks(*parallel_tasks) + graph.add_dependency(task, parallel_tasks[1]) + # only a partial dependency exists - has_dependency is expected to return False + assert graph.has_dependency(task, parallel) is False + + def test_has_nonexistent_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + assert graph.has_dependency(group_1_tasks, group_2_tasks) is False + + def test_remove_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + another_dependent_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + graph.add_tasks(another_dependent_task) + graph.add_dependency(task, dependency_task) + graph.add_dependency(another_dependent_task, dependency_task) + + graph.remove_dependency(task, dependency_task) + remove_result = graph.has_dependency(task, dependency_task) + assert remove_result is False + assert graph.has_dependency(task, dependency_task) is False + assert graph.has_dependency(another_dependent_task, dependency_task) is True + + def test_remove_nonexistent_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + # removing a dependency not in graph - should have no effect, and return False + graph.remove_dependency(task, dependency_task) + remove_result = graph.has_dependency(task, dependency_task) + assert remove_result is False + tasks = [t for t in graph.tasks] + assert set(tasks) == set([task, dependency_task]) + + def test_remove_dependency_nonexistent_dependent(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.remove_dependency(task_not_in_graph, task) + + def test_remove_dependency_nonexistent_dependency(self, graph): + # in this test the dependency *task* is not in the graph, not just the dependency itself + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.remove_dependency(task, task_not_in_graph) + + def test_remove_dependency_empty_dependent(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting remove_dependency result to be False - no dependency has been created + graph.remove_dependency([], task) + assert set(graph.tasks) == set((task,)) + + def test_remove_dependency_empty_dependency(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting remove_dependency result to be False - no dependency has been created + graph.remove_dependency(task, []) + assert set(graph.tasks) == set((task,)) + + def test_remove_dependency_dependent_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(group_tasks, task) + graph.remove_dependency(group_tasks, task) + remove_result = graph.has_dependency(group_tasks, task) + assert remove_result is False + assert graph.has_dependency(group_tasks[0], task) is False + assert graph.has_dependency(group_tasks[1], task) is False + assert graph.has_dependency(group_tasks[2], task) is False + + def test_remove_dependency_dependency_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(task, group_tasks) + graph.remove_dependency(task, group_tasks) + remove_result = graph.has_dependency(task, group_tasks) + assert remove_result is False + assert graph.has_dependency(task, group_tasks[0]) is False + assert graph.has_dependency(task, group_tasks[1]) is False + assert graph.has_dependency(task, group_tasks[2]) is False + + def test_remove_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + graph.add_dependency(group_2_tasks, group_1_tasks) + graph.remove_dependency(group_2_tasks, group_1_tasks) + remove_result = graph.has_dependency(group_2_tasks, group_1_tasks) + assert remove_result is False + for group_2_task in group_2_tasks: + assert graph.has_dependency(group_2_task, group_1_tasks[0]) is False + assert graph.has_dependency(group_2_task, group_1_tasks[1]) is False + assert graph.has_dependency(group_2_task, group_1_tasks[2]) is False + + def test_remove_dependency_dependency_group_with_some_existing_dependencies(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(task, group_tasks[1]) + graph.remove_dependency(task, group_tasks) + remove_result = graph.has_dependency(task, group_tasks) + # only a partial dependency exists - remove_dependency is expected to return False + assert remove_result is False + # no dependencies are expected to have changed + assert graph.has_dependency(task, group_tasks[0]) is False + assert graph.has_dependency(task, group_tasks[1]) is True + assert graph.has_dependency(task, group_tasks[2]) is False + + def test_remove_nonexistent_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + # removing a dependency not in graph - should have no effect, and return False + graph.remove_dependency(group_2_tasks, group_1_tasks) + remove_result = graph.has_dependency(group_2_tasks, group_1_tasks) + assert remove_result is False + + # nested tests + + def test_group_with_nested_sequence(self, graph): + all_tasks = [MockTask() for _ in xrange(5)] + graph.add_tasks(all_tasks[0], + graph.sequence(all_tasks[1], all_tasks[2], all_tasks[3]), + all_tasks[4]) + assert set(graph.tasks) == set(all_tasks) + + # tasks[2] and tasks[3] should each have a single dependency; the rest should have none + assert len(list(graph.get_dependencies(all_tasks[0]))) == 0 + assert len(list(graph.get_dependencies(all_tasks[1]))) == 0 + assert set(graph.get_dependencies(all_tasks[2])) == set([all_tasks[1]]) + assert set(graph.get_dependencies(all_tasks[3])) == set([all_tasks[2]]) + assert len(list(graph.get_dependencies(all_tasks[4]))) == 0 + + def test_group_with_nested_group(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.add_tasks(tasks[0], (tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + # none of the tasks should have any dependencies + for i in xrange(len(tasks)): + assert len(list(graph.get_dependencies(tasks[i]))) == 0 + + def test_group_with_recursively_nested_group(self, graph): + recursively_nested_tasks = [MockTask(), MockTask(), MockTask()] + nested_tasks = [MockTask(), MockTask(), MockTask(), recursively_nested_tasks] + tasks = [MockTask(), MockTask(), MockTask(), nested_tasks] + graph.add_tasks(*tasks) + + assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + recursively_nested_tasks) + for tasks_list in [tasks, nested_tasks, recursively_nested_tasks]: + for i in xrange(len(tasks_list[:3])): + assert len(list(graph.get_dependencies(tasks_list[i]))) == 0 + + def test_group_with_recursively_nested_group_and_interdependencies(self, graph): + recursively_nested_tasks = [MockTask(), MockTask(), MockTask()] + nested_tasks = [MockTask(), MockTask(), MockTask(), recursively_nested_tasks] + tasks = [MockTask(), MockTask(), MockTask(), nested_tasks] + graph.add_tasks(*tasks) + + graph.add_dependency(tasks[2], nested_tasks[2]) + graph.add_dependency(nested_tasks[1], recursively_nested_tasks[0]) + graph.add_dependency(recursively_nested_tasks[1], tasks[0]) + + assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + recursively_nested_tasks) + assert set(graph.get_dependencies(tasks[0])) == set() + assert set(graph.get_dependencies(tasks[1])) == set() + assert set(graph.get_dependencies(tasks[2])) == set([nested_tasks[2]]) + + assert set(graph.get_dependencies(nested_tasks[0])) == set() + assert set(graph.get_dependencies(nested_tasks[1])) == set([recursively_nested_tasks[0]]) + assert set(graph.get_dependencies(nested_tasks[2])) == set() + + assert set(graph.get_dependencies(recursively_nested_tasks[0])) == set() + assert set(graph.get_dependencies(recursively_nested_tasks[1])) == set([tasks[0]]) + assert set(graph.get_dependencies(recursively_nested_tasks[2])) == set() + + +class TestTaskGraphSequence(object): + + def test_sequence(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + graph.sequence(*tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + assert len(list(graph.get_dependencies(tasks[0]))) == 0 + assert set(graph.get_dependencies(tasks[1])) == set([tasks[0]]) + assert set(graph.get_dependencies(tasks[2])) == set([tasks[1]]) + + def test_sequence_with_some_tasks_and_dependencies_already_in_graph(self, graph): + # tests both that tasks which werent previously in graph get inserted, and + # that existing tasks don't get re-added to graph + tasks = [MockTask(), MockTask(), MockTask()] + # insert some tasks and dependencies to the graph + graph.add_tasks(tasks[1]) + graph.add_tasks(tasks[2]) + graph.add_dependency(tasks[2], tasks[1]) + + graph.sequence(*tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + assert len(list(graph.get_dependencies(tasks[0]))) == 0 + assert set(graph.get_dependencies(tasks[1])) == set([tasks[0]]) + assert set(graph.get_dependencies(tasks[2])) == set([tasks[1]]) + + def test_sequence_with_nested_sequence(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.sequence(tasks[0], graph.sequence(tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + # first task should have no dependencies + assert len(list(graph.get_dependencies(tasks[0]))) == 0 + assert len(list(graph.get_dependencies(tasks[1]))) == 1 + assert len(list(graph.get_dependencies(tasks[2]))) == 2 + assert len(list(graph.get_dependencies(tasks[3]))) == 2 + assert len(list(graph.get_dependencies(tasks[4]))) == 3 + + def test_sequence_with_nested_group(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.sequence(tasks[0], (tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + # first task should have no dependencies + assert len(list(graph.get_dependencies(tasks[0]))) == 0 + # rest of the tasks (except last) should have a single dependency - the first task + for i in xrange(1, 4): + assert set(graph.get_dependencies(tasks[i])) == set([tasks[0]]) + # last task should have have a dependency on all tasks except for the first one + assert set(graph.get_dependencies(tasks[4])) == set([tasks[1], tasks[2], tasks[3]]) + + def test_sequence_with_recursively_nested_group(self, graph): + recursively_nested_group = [MockTask(), MockTask()] + nested_group = [MockTask(), recursively_nested_group, MockTask()] + sequence_tasks = [MockTask(), nested_group, MockTask()] + + graph.sequence(*sequence_tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set([sequence_tasks[0], nested_group[0], + recursively_nested_group[0], recursively_nested_group[1], + nested_group[2], sequence_tasks[2]]) + + assert list(graph.get_dependencies(nested_group[0])) == [sequence_tasks[0]] + assert list(graph.get_dependencies(recursively_nested_group[0])) == [sequence_tasks[0]] + assert list(graph.get_dependencies(recursively_nested_group[1])) == [sequence_tasks[0]] + assert list(graph.get_dependencies(nested_group[2])) == [sequence_tasks[0]] + + assert list(graph.get_dependents(nested_group[0])) == [sequence_tasks[2]] + assert list(graph.get_dependents(recursively_nested_group[0])) == [sequence_tasks[2]] + assert list(graph.get_dependents(recursively_nested_group[1])) == [sequence_tasks[2]] + assert list(graph.get_dependents(nested_group[2])) == [sequence_tasks[2]] + + def test_sequence_with_empty_group(self, graph): + tasks = [MockTask(), [], MockTask()] + graph.sequence(*tasks) + graph_tasks = set([t for t in graph.tasks]) + assert graph_tasks == set([tasks[0], tasks[2]]) + assert list(graph.get_dependents(tasks[0])) == [tasks[2]] + assert list(graph.get_dependencies(tasks[2])) == [tasks[0]] + + def test_sequence_with_recursively_nested_sequence_and_interdependencies(self, graph): + recursively_nested_tasks = list(graph.sequence(MockTask(), MockTask(), MockTask())) + nested_tasks = list(graph.sequence(MockTask(), + MockTask(), + MockTask(), + recursively_nested_tasks)) + tasks = [MockTask(), MockTask(), MockTask(), nested_tasks] + graph.sequence(*tasks) + + assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + recursively_nested_tasks) + assert set(graph.get_dependencies(tasks[0])) == set() + for i in xrange(1, len(tasks[:-1])): + assert set(graph.get_dependencies(tasks[i])) == set([tasks[i - 1]]) + + assert set(graph.get_dependencies(nested_tasks[0])) == set([tasks[2]]) + for i in xrange(1, len(nested_tasks[:-1])): + assert set(graph.get_dependencies(nested_tasks[i])) == \ + set([tasks[2], nested_tasks[i-1]]) + + assert set(graph.get_dependencies(recursively_nested_tasks[0])) == \ + set([tasks[2], nested_tasks[2]]) + for i in xrange(1, len(recursively_nested_tasks[:-1])): + assert set(graph.get_dependencies(recursively_nested_tasks[i])) == \ + set([tasks[2], nested_tasks[2], recursively_nested_tasks[i-1]])
