Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-278-Remove-core-tasks 979a4b445 -> 092b45f0d
graph work Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/092b45f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/092b45f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/092b45f0 Branch: refs/heads/ARIA-278-Remove-core-tasks Commit: 092b45f0de707cc9c1ed8bebe594bb0b7bb5846f Parents: 979a4b4 Author: max-orlov <ma...@gigaspaces.com> Authored: Tue Jun 13 20:33:34 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Tue Jun 13 20:33:34 2017 +0300 ---------------------------------------------------------------------- aria/modeling/orchestration.py | 10 +-- aria/orchestrator/workflows/core/engine.py | 16 ++-- aria/orchestrator/workflows/core/translation.py | 93 +++++++++----------- .../test_task_graph_into_execution_graph.py | 13 +-- .../workflows/executor/test_process_executor.py | 1 - 5 files changed, 60 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 37aa431..4a5771a 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -400,14 +400,14 @@ class TaskBase(mixins.ModelMixin): def retry(message=None, retry_interval=None): raise TaskRetryException(message, retry_interval=retry_interval) - @declared_attr - def operation_task_dependency_fk(cls): - """For Type one-to-many to Type""" - return relationship.foreign_key('task', nullable=True) + # @declared_attr + # def dependency_fk(cls): + # """For Type one-to-many to Type""" + # return relationship.foreign_key('task', nullable=True) @declared_attr def dependent_tasks(cls): - return relationship.one_to_many_self(cls, 'operation_task_dependency_fk') + return relationship.many_to_many(cls, 'task', 'dependent', other_property='dependencies') def has_ended(self): if self.stub_type is not None: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 826a7a2..67e8c1d 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -42,13 +42,13 @@ class Engine(logger.LoggerMixin): self._workflow_context = workflow_context self._execution_graph = networkx.DiGraph() self._executor_kwargs = executor_kwargs - translation.build_execution_graph(task_graph=tasks_graph, - execution_graph=self._execution_graph, - default_executor=executor, - execution=workflow_context.execution) + translation.store_tasks(task_graph=tasks_graph, + execution_graph=self._execution_graph, + default_executor=executor, + execution=workflow_context.execution) # Flush changes - workflow_context.model.execution._session.flush() + workflow_context.model.execution.update(workflow_context.execution) def execute(self): """ @@ -106,11 +106,7 @@ class Engine(logger.LoggerMixin): def _tasks_iter(self): for _, data in self._execution_graph.nodes_iter(data=True): - task = data['task'] - if isinstance(task, models.Task): - if not task.has_ended(): - self._workflow_context.model.task.refresh(task) - yield task + yield self._workflow_context.model.task.get(data['task'].id) def _handle_executable_task(self, task): if not task.stub_type: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/aria/orchestrator/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py index be4e34d..8da9bd7 100644 --- a/aria/orchestrator/workflows/core/translation.py +++ b/aria/orchestrator/workflows/core/translation.py @@ -22,14 +22,10 @@ from .. import api from ..executor import base -def build_execution_graph( - task_graph, - execution_graph, - default_executor, - execution, - start_stub_type=models.Task.START_WORKFLOW, - end_stub_type=models.Task.END_WORKFLOW, - depends_on=()): +def store_tasks(ctx, task_graph, default_executor, execution, + start_stub_type=models.Task.START_WORKFLOW, + end_stub_type=models.Task.END_WORKFLOW, + depends_on=()): """ Translates the user graph to the execution graph :param task_graph: The user's graph @@ -39,27 +35,28 @@ def build_execution_graph( :param end_stub_type: internal use :param depends_on: internal use """ + depends_on = list(depends_on) + # Insert start marker start_task = models.Task(api_id=_start_graph_suffix(task_graph.id), _executor=base.StubTaskExecutor, execution=execution, - stub_type=start_stub_type) - _add_task_and_dependencies(execution_graph, start_task, depends_on) + stub_type=start_stub_type, + dependencies=depends_on) for api_task in task_graph.topological_order(reverse=True): dependencies = task_graph.get_dependencies(api_task) - operation_dependencies = _get_tasks_from_dependencies( - execution_graph, dependencies, default=[start_task]) + operation_dependencies = _get_tasks_from_dependencies(ctx, dependencies, [start_task]) if isinstance(api_task, api.task.OperationTask): - operation_task = models.Task.from_api_task(api_task=api_task, - executor=default_executor) - _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) + models.Task.from_api_task( + api_task=api_task, executor=default_executor, dependencies=operation_dependencies) + elif isinstance(api_task, api.task.WorkflowTask): # Build the graph recursively while adding start and end markers - build_execution_graph( + store_tasks( + ctx=ctx, task_graph=api_task, - execution_graph=execution_graph, default_executor=default_executor, execution=execution, start_stub_type=models.Task.START_SUBWROFKLOW, @@ -67,33 +64,41 @@ def build_execution_graph( depends_on=operation_dependencies ) elif isinstance(api_task, api.task.StubTask): - stub_task = models.Task(api_id=api_task.id, - _executor=base.StubTaskExecutor, - execution=execution, - stub_type=models.Task.STUB) - _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies) + models.Task(api_id=api_task.id, + _executor=base.StubTaskExecutor, + execution=execution, + stub_type=models.Task.STUB, + dependencies=operation_dependencies) else: raise RuntimeError('Undefined state') # Insert end marker - workflow_dependencies = _get_tasks_from_dependencies( - execution_graph, - _get_non_dependency_tasks(task_graph), - default=[start_task]) - end_task = models.Task(api_id=_end_graph_suffix(task_graph.id), - _executor=base.StubTaskExecutor, - execution=execution, - stub_type=end_stub_type) - _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies) + workflow_dependencies = [task for task in ctx.model.task.list() if not task.dependent_tasks] + models.Task(api_id=_end_graph_suffix(task_graph.id), + _executor=base.StubTaskExecutor, + execution=execution, + stub_type=end_stub_type, + dependencies=workflow_dependencies) + + ctx.model.execution.update(execution) + +def _start_graph_suffix(api_id): + return '{0}-Start'.format(api_id) + + +def _end_graph_suffix(api_id): + return '{0}-End'.format(api_id) -def _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies=()): - execution_graph.add_node(operation_task.api_id, task=operation_task) - for dependency in operation_dependencies: - execution_graph.add_edge(dependency.api_id, operation_task.api_id) +def construct_graph(graph, execution): + for task in execution.tasks: + for dependency in task.dependencies: + graph.add_edge(dependency, task) + return graph -def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): + +def _get_tasks_from_dependencies(ctx, dependencies, default=()): """ Returns task list from dependencies. """ @@ -103,19 +108,5 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): dependency_name = dependency.id else: dependency_name = _end_graph_suffix(dependency.id) - tasks.append(execution_graph.node[dependency_name]['task']) + tasks.extend(list(ctx.model.task.list(filters={'name': dependency_name}))) return tasks or default - - -def _start_graph_suffix(api_id): - return '{0}-Start'.format(api_id) - - -def _end_graph_suffix(api_id): - return '{0}-End'.format(api_id) - - -def _get_non_dependency_tasks(graph): - for task in graph.tasks: - if len(list(graph.get_dependents(task))) == 0: - yield task http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index 398ca7e..4abed37 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -66,11 +66,12 @@ def test_task_graph_into_execution_graph(tmpdir): test_task_graph.add_dependency(simple_after_task, inner_task_graph) # Direct check - execution_graph = DiGraph() - core.translation.build_execution_graph(task_graph=test_task_graph, - execution_graph=execution_graph, - execution=task_context.model.execution.list()[0], - default_executor=base.StubTaskExecutor) + core.translation.store_tasks(ctx=task_context, + task_graph=test_task_graph, + execution=task_context.model.execution.list()[0], + default_executor=base.StubTaskExecutor) + + execution_graph = core.translation.construct_graph(DiGraph(), task_context.execution) execution_tasks = topological_sort(execution_graph) assert len(execution_tasks) == 7 @@ -85,7 +86,7 @@ def test_task_graph_into_execution_graph(tmpdir): '{0}-End'.format(test_task_graph.id) ] - assert expected_tasks_names == execution_tasks + assert expected_tasks_names == [t.api_id for t in execution_tasks] assert all(isinstance(_get_task_by_name(task_name, execution_graph), models.Task) for task_name in execution_tasks) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 058190e..bca2ea3 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -30,7 +30,6 @@ from tests.fixtures import ( # pylint: disable=unused-import plugin_manager, fs_model as model ) -from . import MockTask class TestProcessExecutor(object):