Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution 814306ba0 -> c211e1064 (forced update)
wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/c211e106 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/c211e106 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/c211e106 Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution Commit: c211e1064fc84236ef753783fb4367d335ab197a Parents: 1fee85c Author: max-orlov <ma...@gigaspaces.com> Authored: Wed Jun 21 12:41:33 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Wed Jun 21 14:00:41 2017 +0300 ---------------------------------------------------------------------- aria/modeling/orchestration.py | 2 - aria/orchestrator/workflow_runner.py | 3 +- aria/orchestrator/workflows/core/compile.py | 198 ++++++++++--------- tests/orchestrator/context/__init__.py | 2 +- tests/orchestrator/context/test_serialize.py | 2 +- .../orchestrator/execution_plugin/test_local.py | 2 +- tests/orchestrator/execution_plugin/test_ssh.py | 3 +- .../orchestrator/workflows/core/test_engine.py | 2 +- .../orchestrator/workflows/core/test_events.py | 7 +- .../test_task_graph_into_execution_graph.py | 6 +- .../executor/test_process_executor_extension.py | 2 +- .../test_process_executor_tracked_changes.py | 2 +- 12 files changed, 116 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c211e106/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 17d2476..541f4c4 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -305,7 +305,6 @@ class TaskBase(mixins.ModelMixin): ended_at = Column(DateTime, default=None) attempts_count = Column(Integer, default=1) - _api_id = Column(String) _executor = Column(PickleType) _context_cls = Column(PickleType) _stub_type = Column(Enum(*STUB_TYPES)) @@ -441,7 +440,6 @@ class TaskBase(mixins.ModelMixin): 'plugin': api_task.plugin, 'function': api_task.function, 'arguments': api_task.arguments, - '_api_id': api_task.id, '_context_cls': api_task._context_cls, '_executor': executor, } http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c211e106/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 9e6b3ad..dcd8ce3 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -87,8 +87,7 @@ class WorkflowRunner(object): execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values()) self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) - compile.create_execution_tasks( - self._workflow_context, self._tasks_graph, executor.__class__) + compile.GraphCompiler(self._workflow_context, executor.__class__).compile(self._tasks_graph) self._engine = engine.Engine(executors={executor.__class__: executor}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c211e106/aria/orchestrator/workflows/core/compile.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/compile.py b/aria/orchestrator/workflows/core/compile.py index 932268a..83de22c 100644 --- a/aria/orchestrator/workflows/core/compile.py +++ b/aria/orchestrator/workflows/core/compile.py @@ -18,99 +18,105 @@ from ....modeling import models from .. import executor, api -def create_execution_tasks(ctx, task_graph, default_executor): - execution = ctx.execution - _construct_execution_tasks(execution, task_graph, default_executor) - ctx.model.execution.update(execution) - return execution.tasks - - -def _construct_execution_tasks(execution, - task_graph, - default_executor, - stub_executor=executor.base.StubTaskExecutor, - start_stub_type=models.Task.START_WORKFLOW, - end_stub_type=models.Task.END_WORKFLOW, - depends_on=()): - """ - Translates the user graph to the execution graph - :param task_graph: The user's graph - :param start_stub_type: internal use - :param end_stub_type: internal use - :param depends_on: internal use - """ - depends_on = list(depends_on) - - # Insert start marker - start_task = models.Task(execution=execution, - dependencies=depends_on, - _api_id=_start_graph_suffix(task_graph.id), - _stub_type=start_stub_type, - _executor=stub_executor) - - for task in task_graph.topological_order(reverse=True): - operation_dependencies = _get_tasks_from_dependencies( - execution, task_graph.get_dependencies(task), [start_task]) - - if isinstance(task, api.task.OperationTask): - models.Task.from_api_task(api_task=task, - executor=default_executor, - dependencies=operation_dependencies) - - elif isinstance(task, api.task.WorkflowTask): - # Build the graph recursively while adding start and end markers - _construct_execution_tasks( - execution=execution, - task_graph=task, - default_executor=default_executor, - stub_executor=stub_executor, - start_stub_type=models.Task.START_SUBWROFKLOW, - end_stub_type=models.Task.END_SUBWORKFLOW, - depends_on=operation_dependencies - ) - elif isinstance(task, api.task.StubTask): - models.Task(execution=execution, - dependencies=operation_dependencies, - _api_id=task.id, - _executor=stub_executor, - _stub_type=models.Task.STUB, - ) - else: - raise RuntimeError('Undefined state') - - # Insert end marker - models.Task(dependencies=_get_non_dependent_tasks(execution) or [start_task], - execution=execution, - _api_id=_end_graph_suffix(task_graph.id), - _executor=stub_executor, - _stub_type=end_stub_type) - - -def _start_graph_suffix(api_id): - return '{0}-Start'.format(api_id) - - -def _end_graph_suffix(api_id): - return '{0}-End'.format(api_id) - - -def _get_non_dependent_tasks(execution): - tasks_with_dependencies = set() - for task in execution.tasks: - tasks_with_dependencies.update(task.dependencies) - return list(set(execution.tasks) - set(tasks_with_dependencies)) - - -def _get_tasks_from_dependencies(execution, dependencies, default=()): - """ - Returns task list from dependencies. - """ - tasks = [] - for dependency in dependencies: - if getattr(dependency, 'actor', False): - # This is - dependency_name = dependency.id - else: - dependency_name = _end_graph_suffix(dependency.id) - tasks.extend(task for task in execution.tasks if task._api_id == dependency_name) - return tasks or default +# TODO: is class really needed? + +class GraphCompiler(object): + def __init__(self, ctx, default_executor): + self._ctx = ctx + self._default_executor = default_executor + self._stub_executor = executor.base.StubTaskExecutor + self._model_to_api_id = {} + + def compile(self, + task_graph, + start_stub_type=models.Task.START_WORKFLOW, + end_stub_type=models.Task.END_WORKFLOW, + depends_on=()): + """ + Translates the user graph to the execution graph + :param task_graph: The user's graph + :param start_stub_type: internal use + :param end_stub_type: internal use + :param depends_on: internal use + """ + task_graph = task_graph or self._task_graph + depends_on = list(depends_on) + + # Insert start marker + start_task = self._create_stub_task( + start_stub_type, depends_on, self._start_graph_suffix(task_graph.id), task_graph.name, + ) + + for task in task_graph.topological_order(reverse=True): + dependencies = \ + (self._get_tasks_from_dependencies(task_graph.get_dependencies(task)) + or [start_task]) + + if isinstance(task, api.task.OperationTask): + self._create_operation_task(task, dependencies) + + elif isinstance(task, api.task.WorkflowTask): + # Build the graph recursively while adding start and end markers + self.compile( + task, models.Task.START_SUBWROFKLOW, models.Task.END_SUBWORKFLOW, dependencies + ) + elif isinstance(task, api.task.StubTask): + self._create_stub_task(models.Task.STUB, dependencies, task.id) + else: + raise RuntimeError('Undefined state') + + # Insert end marker + self._create_stub_task( + end_stub_type, + self._get_non_dependent_tasks(self._ctx.execution) or [start_task], + self._end_graph_suffix(task_graph.id), + task_graph.name + ) + + def _create_stub_task(self, stub_type, dependencies, api_id, name=None): + model_task = models.Task( + name=name, + dependencies=dependencies, + execution=self._ctx.execution, + _executor=self._stub_executor, + _stub_type=stub_type) + self._ctx.model.task.put(model_task) + self._model_to_api_id[model_task.id] = api_id + return model_task + + def _create_operation_task(self, api_task, dependencies): + model_task = models.Task.from_api_task( + api_task, self._default_executor, dependencies=dependencies) + self._ctx.model.task.put(model_task) + self._model_to_api_id[model_task.id] = api_task.id + return model_task + + @staticmethod + def _start_graph_suffix(api_id): + return '{0}-Start'.format(api_id) + + @staticmethod + def _end_graph_suffix(api_id): + return '{0}-End'.format(api_id) + + @staticmethod + def _get_non_dependent_tasks(execution): + tasks_with_dependencies = set() + for task in execution.tasks: + tasks_with_dependencies.update(task.dependencies) + return list(set(execution.tasks) - set(tasks_with_dependencies)) + + def _get_tasks_from_dependencies(self, dependencies): + """ + Returns task list from dependencies. + """ + tasks = [] + for dependency in dependencies: + if getattr(dependency, 'actor', False): + # This is + dependency_name = dependency.id + else: + dependency_name = self._end_graph_suffix(dependency.id) + tasks.extend(task for task in self._ctx.execution.tasks + if self._model_to_api_id.get(task.id, None) == dependency_name) + return tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c211e106/tests/orchestrator/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py index 086a066..752706e 100644 --- a/tests/orchestrator/context/__init__.py +++ b/tests/orchestrator/context/__init__.py @@ -26,7 +26,7 @@ def op_path(func, module_path=None): def execute(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) - compile.create_execution_tasks(workflow_context, graph, executor.__class__) + compile.GraphCompiler(workflow_context, executor.__class__).compile(graph) eng = engine.Engine(executors={executor.__class__: executor}) eng.execute(workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c211e106/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index 5db5b63..b7335a0 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -48,7 +48,7 @@ def test_serialize_operation_context(context, executor, tmpdir): context.model.node.update(node) graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - compile.create_execution_tasks(context, graph, executor.__class__) + compile.GraphCompiler(context, executor.__class__).compile(graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c211e106/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 1695320..ab6310c 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -500,7 +500,7 @@ if __name__ == '__main__': arguments=arguments)) return graph tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter - compile.create_execution_tasks(workflow_context, tasks_graph, executor.__class__) + compile.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(workflow_context) return workflow_context.model.node.get_by_name( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c211e106/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index fb1dc09..13ad1a3 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -254,8 +254,7 @@ class TestWithActualSSHServer(object): graph.sequence(*ops) return graph tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter - compile.create_execution_tasks( - self._workflow_context, tasks_graph, self._executor.__class__) + compile.GraphCompiler(self._workflow_context, self._executor.__class__).compile(tasks_graph) eng = engine.Engine({self._executor.__class__: self._executor}) eng.execute(self._workflow_context) return self._workflow_context.model.node.get_by_name( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c211e106/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index b77d284..7275723 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -50,7 +50,7 @@ class BaseTest(object): @staticmethod def _engine(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) - compile.create_execution_tasks(workflow_context, graph, executor.__class__) + compile.GraphCompiler(workflow_context, executor.__class__).compile(graph) return engine.Engine(executors={executor.__class__: executor}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c211e106/tests/orchestrator/workflows/core/test_events.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py index 2b82443..32a6b7b 100644 --- a/tests/orchestrator/workflows/core/test_events.py +++ b/tests/orchestrator/workflows/core/test_events.py @@ -113,10 +113,9 @@ def run_operation_on_node(ctx, op_name, interface_name): operation_name=op_name, operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func))) node.interfaces[interface.name] = interface - compile.create_execution_tasks( - ctx, - single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name), - ThreadExecutor) + compile.GraphCompiler(ctx, ThreadExecutor).compile( + single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name) + ) eng = engine.Engine(executors={ThreadExecutor: ThreadExecutor()}) eng.execute(ctx) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c211e106/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index f5fb17a..ef20374 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -65,7 +65,8 @@ def test_task_graph_into_execution_graph(tmpdir): test_task_graph.add_dependency(inner_task_graph, simple_before_task) test_task_graph.add_dependency(simple_after_task, inner_task_graph) - compile.create_execution_tasks(workflow_context, test_task_graph, base.StubTaskExecutor) + graph_compiler = compile.GraphCompiler(workflow_context, base.StubTaskExecutor) + graph_compiler.compile(test_task_graph) execution_tasks = topological_sort(workflow_context._graph) @@ -81,7 +82,7 @@ def test_task_graph_into_execution_graph(tmpdir): '{0}-End'.format(test_task_graph.id) ] - assert expected_tasks_names == [t._api_id for t in execution_tasks] + assert expected_tasks_names == [graph_compiler._model_to_api_id[t.id] for t in execution_tasks] assert all(isinstance(task, models.Task) for task in execution_tasks) execution_tasks = iter(execution_tasks) @@ -97,7 +98,6 @@ def test_task_graph_into_execution_graph(tmpdir): def _assert_execution_is_api_task(execution_task, api_task): - assert execution_task._api_id == api_task.id assert execution_task.name == api_task.name assert execution_task.function == api_task.function assert execution_task.actor == api_task.actor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c211e106/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index ba98c4f..aa08685 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -57,7 +57,7 @@ def test_decorate_extension(context, executor): graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - compile.create_execution_tasks(context, graph, executor.__class__) + compile.GraphCompiler(context, executor.__class__).compile(graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) out = get_node(context).attributes.get('out').value http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c211e106/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index 2f1c325..7102b13 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -107,7 +107,7 @@ def _run_workflow(context, executor, op_func, arguments=None): graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - compile.create_execution_tasks(context, graph, executor.__class__) + compile.GraphCompiler(context, executor.__class__).compile(graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')