Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-278-Remove-core-tasks 7f5c6204f -> ecf5fc9f0
wip2 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ecf5fc9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ecf5fc9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ecf5fc9f Branch: refs/heads/ARIA-278-Remove-core-tasks Commit: ecf5fc9f0223fc96e22d216edea636874902d237 Parents: 7f5c620 Author: max-orlov <ma...@gigaspaces.com> Authored: Thu Jun 15 13:49:19 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Thu Jun 15 13:49:19 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflow_runner.py | 2 +- aria/orchestrator/workflows/core/engine.py | 78 ++++++++++---------- tests/orchestrator/test_workflow_runner.py | 28 +++---- .../orchestrator/workflows/core/test_engine.py | 6 +- .../test_task_graph_into_execution_graph.py | 17 ++--- 5 files changed, 65 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ecf5fc9f/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 919da58..1963087 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -109,7 +109,7 @@ class WorkflowRunner(object): return self._model_storage.service.get(self._service_id) def execute(self): - self._engine.execute(self._workflow_context) + self._engine.execute(ctx=self._workflow_context) def cancel(self): self._engine.cancel_execution() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ecf5fc9f/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index b9c3439..ade3661 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -38,75 +38,78 @@ class Engine(logger.LoggerMixin): def __init__(self, executor, **kwargs): super(Engine, self).__init__(**kwargs) self._executors = {executor.__class__: executor} - self._workflow_context = None def execute(self, ctx): """ execute the workflow """ - self._workflow_context = ctx - try: - events.start_workflow_signal.send(self._workflow_context) + events.start_workflow_signal.send(ctx) while True: - cancel = self._is_cancel() + cancel = self._is_cancel(ctx) if cancel: break - for task in self._ended_tasks(): - self._handle_ended_tasks(task) - for task in self._executable_tasks(): - self._handle_executable_task(task) - if self._all_tasks_consumed(): + for task in self._ended_tasks(ctx): + self._handle_ended_tasks(ctx, task) + for task in self._executable_tasks(ctx): + self._handle_executable_task(ctx, task) + if self._all_tasks_consumed(ctx): break else: time.sleep(0.1) if cancel: - events.on_cancelled_workflow_signal.send(self._workflow_context) + events.on_cancelled_workflow_signal.send(ctx) else: - events.on_success_workflow_signal.send(self._workflow_context) + events.on_success_workflow_signal.send(ctx) except BaseException as e: - events.on_failure_workflow_signal.send(self._workflow_context, exception=e) + events.on_failure_workflow_signal.send(ctx, exception=e) raise - def cancel_execution(self): + @staticmethod + def cancel_execution(ctx): """ Send a cancel request to the engine. If execution already started, execution status will be modified to 'cancelling' status. If execution is in pending mode, execution status will be modified to 'cancelled' directly. """ - events.on_cancelling_workflow_signal.send(self._workflow_context) - self._workflow_context.execution = self._workflow_context.execution + events.on_cancelling_workflow_signal.send(ctx) + ctx.execution = ctx.execution - def _is_cancel(self): - execution = self._workflow_context.model.execution.update(self._workflow_context.execution) + @staticmethod + def _is_cancel(ctx): + execution = ctx.model.execution.update(ctx.execution) return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED) - def _executable_tasks(self): + def _executable_tasks(self, ctx): now = datetime.utcnow() return ( - task for task in self._tasks_iter() - if task.is_waiting() and task.due_at <= now and not self._task_has_dependencies(task) + task for task in self._tasks_iter(ctx) + if task.is_waiting() and task.due_at <= now and \ + not self._task_has_dependencies(ctx, task) ) - def _ended_tasks(self): - for task in self._tasks_iter(): - if task.has_ended() and task in self._workflow_context.graph: + def _ended_tasks(self, ctx): + for task in self._tasks_iter(ctx): + if task.has_ended() and task in ctx.graph: yield task - def _task_has_dependencies(self, task): + @staticmethod + def _task_has_dependencies(ctx, task): return task.dependencies and \ - all(d in self._workflow_context.graph for d in task.dependencies) + all(d in ctx.graph for d in task.dependencies) - def _all_tasks_consumed(self): - return len(self._workflow_context.graph.node) == 0 + @staticmethod + def _all_tasks_consumed(ctx): + return len(ctx.graph.node) == 0 - def _tasks_iter(self): - for task in self._workflow_context.execution.tasks: + @staticmethod + def _tasks_iter(ctx): + for task in ctx.execution.tasks: if not task.has_ended(): - task = self._workflow_context.model.task.refresh(task) + task = ctx.model.task.refresh(task) yield task - def _handle_executable_task(self, task): + def _handle_executable_task(self, ctx, task): if not task.stub_type: events.sent_task_signal.send(task) @@ -116,9 +119,9 @@ class Engine(logger.LoggerMixin): context_cls = task._context_cls or operation.BaseOperationContext op_ctx = context_cls( - model_storage=self._workflow_context.model, - resource_storage=self._workflow_context.resource, - workdir=self._workflow_context._workdir, + model_storage=ctx.model, + resource_storage=ctx.resource, + workdir=ctx._workdir, task_id=task.id, actor_id=task.actor.id if task.actor else None, service_id=task.execution.service.id, @@ -128,8 +131,9 @@ class Engine(logger.LoggerMixin): executor.execute(op_ctx) - def _handle_ended_tasks(self, task): + @staticmethod + def _handle_ended_tasks(ctx, task): if task.status == models.Task.FAILED and not task.ignore_failure: raise exceptions.ExecutorException('Workflow failed') else: - self._workflow_context.graph.remove_node(task) + ctx.graph.remove_node(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ecf5fc9f/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index cd50580..4011370 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -117,30 +117,28 @@ def test_task_configuration_parameters(request): task_max_attempts = 5 task_retry_interval = 7 - with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls: + with mock.patch('aria.orchestrator.workflow_runner.Engine.execute') as mock_engine_execute: _create_workflow_runner(request, mock_workflow, task_max_attempts=task_max_attempts, - task_retry_interval=task_retry_interval) - _, engine_kwargs = mock_engine_cls.call_args - # TODO: fix - # assert engine_kwargs['workflow_context']._task_max_attempts == task_max_attempts - # assert engine_kwargs['workflow_context']._task_retry_interval == task_retry_interval + task_retry_interval=task_retry_interval).execute() + _, engine_kwargs = mock_engine_execute.call_args + assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts + assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval def test_execute(request, service): mock_workflow = _setup_mock_workflow_in_service(request) mock_engine = mock.MagicMock() - with mock.patch('aria.orchestrator.workflow_runner.Engine', return_value=mock_engine) \ - as mock_engine_cls: + with mock.patch('aria.orchestrator.workflow_runner.Engine.execute', return_value=mock_engine) \ + as mock_engine_execute: workflow_runner = _create_workflow_runner(request, mock_workflow) + workflow_runner.execute() - _, engine_kwargs = mock_engine_cls.call_args - # TODO: fix - # assert engine_kwargs['workflow_context'].service.id == service.id - # assert engine_kwargs['workflow_context'].execution.workflow_name == 'test_workflow' + _, engine_kwargs = mock_engine_execute.call_args + assert engine_kwargs['ctx'].service.id == service.id + assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow' - workflow_runner.execute() - mock_engine.execute.assert_called_once_with() + mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context) def test_cancel_execution(request): @@ -160,8 +158,6 @@ def test_execution_model_creation(request, service, model): workflow_runner = _create_workflow_runner(request, mock_workflow) _, engine_kwargs = mock_engine_cls.call_args - # TODO: fix - # assert engine_kwargs['workflow_context'].execution == workflow_runner.execution assert model.execution.get(workflow_runner.execution.id) == workflow_runner.execution assert workflow_runner.execution.service.id == service.id assert workflow_runner.execution.workflow_name == mock_workflow http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ecf5fc9f/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 3a14a44..2fbf4a9 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -45,7 +45,7 @@ class BaseTest(object): eng = cls._engine(workflow_func=workflow_func, workflow_context=workflow_context, executor=executor) - eng.execute(execution_graph=workflow_runner.get_execution_graph(workflow_context.execution)) + eng.execute(ctx=workflow_context) return eng @staticmethod @@ -262,7 +262,7 @@ class TestCancel(BaseTest): t = threading.Thread(target=eng.execute, kwargs=dict(ctx=workflow_context)) t.start() time.sleep(10) - eng.cancel_execution() + eng.cancel_execution(workflow_context) t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow assert not t.is_alive() # if join is timed out it will not raise an exception assert workflow_context.states == ['start', 'cancel'] @@ -281,7 +281,7 @@ class TestCancel(BaseTest): eng = self._engine(workflow_func=mock_workflow, workflow_context=workflow_context, executor=executor) - eng.cancel_execution() + eng.cancel_execution(workflow_context) execution = workflow_context.execution assert execution.status == models.Execution.CANCELLED http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ecf5fc9f/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index aebae38..61b7ce7 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -29,8 +29,8 @@ from tests import storage def test_task_graph_into_execution_graph(tmpdir): interface_name = 'Standard' operation_name = 'create' - task_context = mock.context.simple(str(tmpdir)) - node = task_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + workflow_context = mock.context.simple(str(tmpdir)) + node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) interface = mock.models.create_interface( node.service, interface_name, @@ -38,12 +38,12 @@ def test_task_graph_into_execution_graph(tmpdir): operation_kwargs=dict(function='test') ) node.interfaces[interface.name] = interface - task_context.model.node.update(node) + workflow_context.model.node.update(node) def sub_workflow(name, **_): return api.task_graph.TaskGraph(name) - with context.workflow.current.push(task_context): + with context.workflow.current.push(workflow_context): test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') simple_before_task = api.task.OperationTask( node, @@ -68,13 +68,12 @@ def test_task_graph_into_execution_graph(tmpdir): test_task_graph.add_dependency(simple_after_task, inner_task_graph) # Direct check - execution = task_context.model.execution.list()[0] + execution = workflow_context.model.execution.list()[0] workflow_runner.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor) - task_context.execution = execution + workflow_context.execution = execution - execution_graph = workflow_runner.get_execution_graph(execution) - execution_tasks = topological_sort(execution_graph) + execution_tasks = topological_sort(workflow_context.graph) assert len(execution_tasks) == 7 @@ -100,7 +99,7 @@ def test_task_graph_into_execution_graph(tmpdir): _assert_execution_is_api_task(next(execution_tasks), simple_after_task) assert next(execution_tasks).stub_type == models.Task.END_WORKFLOW - storage.release_sqlite_storage(task_context.model) + storage.release_sqlite_storage(workflow_context.model) def _assert_execution_is_api_task(execution_task, api_task):