Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-278-Remove-core-tasks 23ac30197 -> f29148af4 (forced update)
tiny fix to testenv Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/f29148af Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/f29148af Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/f29148af Branch: refs/heads/ARIA-278-Remove-core-tasks Commit: f29148af40766753817d5528660c474f19484411 Parents: 7c5b9ff Author: max-orlov <ma...@gigaspaces.com> Authored: Thu Jun 15 17:32:53 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Thu Jun 15 18:14:32 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/context/operation.py | 2 +- aria/orchestrator/context/workflow.py | 4 ++-- aria/orchestrator/workflows/core/engine.py | 8 ++++---- .../orchestrator/workflows/core/events_handler.py | 18 +++++++++--------- aria/orchestrator/workflows/events_logging.py | 16 ++++++++-------- aria/orchestrator/workflows/executor/base.py | 4 ++-- aria/orchestrator/workflows/executor/dry.py | 2 +- tests/end2end/testenv.py | 4 +++- .../core/test_task_graph_into_execution_graph.py | 2 +- 9 files changed, 31 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f29148af/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 6071c9b..2e897b5 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -109,7 +109,7 @@ class BaseOperationContext(common.BaseContext): @property @contextmanager - def track_task(self): + def track_changes(self): self.model.task.update(self.task) yield self.model.task.update(self.task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f29148af/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index 6a7fb1b..ce7a892 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -96,7 +96,7 @@ class WorkflowContext(BaseContext): ) @property - def graph(self): + def _graph(self): if self._execution_graph is None: graph = DiGraph() for task in self.execution.tasks: @@ -109,7 +109,7 @@ class WorkflowContext(BaseContext): @property @contextmanager - def track_execution(self): + def track_changes(self): self._model.execution.update(self.execution) yield self._model.execution.update(self.execution) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f29148af/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index db5cc8e..48fb60a 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -89,16 +89,16 @@ class Engine(logger.LoggerMixin): def _ended_tasks(self, ctx): for task in self._tasks_iter(ctx): - if task.has_ended() and task in ctx.graph: + if task.has_ended() and task in ctx._graph: yield task @staticmethod def _task_has_dependencies(ctx, task): - return len(ctx.graph.pred.get(task, [])) > 0 + return len(ctx._graph.pred.get(task, [])) > 0 @staticmethod def _all_tasks_consumed(ctx): - return len(ctx.graph.node) == 0 + return len(ctx._graph.node) == 0 @staticmethod def _tasks_iter(ctx): @@ -131,4 +131,4 @@ class Engine(logger.LoggerMixin): if task.status == models.Task.FAILED and not task.ignore_failure: raise exceptions.ExecutorException('Workflow failed') else: - ctx.graph.remove_node(task) + ctx._graph.remove_node(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f29148af/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index b9d467d..3a780d5 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -31,13 +31,13 @@ from ... import exceptions @events.sent_task_signal.connect def _task_sent(ctx, *args, **kwargs): - with ctx.track_task: + with ctx.track_changes: ctx.task.status = ctx.task.SENT @events.start_task_signal.connect def _task_started(ctx, *args, **kwargs): - with ctx.track_task: + with ctx.track_changes: ctx.task.started_at = datetime.utcnow() ctx.task.status = ctx.task.STARTED _update_node_state_if_necessary(ctx, is_transitional=True) @@ -45,7 +45,7 @@ def _task_started(ctx, *args, **kwargs): @events.on_failure_task_signal.connect def _task_failed(ctx, exception, *args, **kwargs): - with ctx.track_task: + with ctx.track_changes: should_retry = all([ not isinstance(exception, exceptions.TaskAbortException), ctx.task.attempts_count < ctx.task.max_attempts or @@ -71,7 +71,7 @@ def _task_failed(ctx, exception, *args, **kwargs): @events.on_success_task_signal.connect def _task_succeeded(ctx, *args, **kwargs): - with ctx.track_task: + with ctx.track_changes: ctx.task.ended_at = datetime.utcnow() ctx.task.status = ctx.task.SUCCESS @@ -80,7 +80,7 @@ def _task_succeeded(ctx, *args, **kwargs): @events.start_workflow_signal.connect def _workflow_started(workflow_context, *args, **kwargs): - with workflow_context.track_execution: + with workflow_context.track_changes: execution = workflow_context.execution # the execution may already be in the process of cancelling if execution.status in (execution.CANCELLING, execution.CANCELLED): @@ -91,7 +91,7 @@ def _workflow_started(workflow_context, *args, **kwargs): @events.on_failure_workflow_signal.connect def _workflow_failed(workflow_context, exception, *args, **kwargs): - with workflow_context.track_execution: + with workflow_context.track_changes: execution = workflow_context.execution execution.error = str(exception) execution.status = execution.FAILED @@ -100,7 +100,7 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs): @events.on_success_workflow_signal.connect def _workflow_succeeded(workflow_context, *args, **kwargs): - with workflow_context.track_execution: + with workflow_context.track_changes: execution = workflow_context.execution execution.status = execution.SUCCEEDED execution.ended_at = datetime.utcnow() @@ -108,7 +108,7 @@ def _workflow_succeeded(workflow_context, *args, **kwargs): @events.on_cancelled_workflow_signal.connect def _workflow_cancelled(workflow_context, *args, **kwargs): - with workflow_context.track_execution: + with workflow_context.track_changes: execution = workflow_context.execution # _workflow_cancelling function may have called this function already if execution.status == execution.CANCELLED: @@ -123,7 +123,7 @@ def _workflow_cancelled(workflow_context, *args, **kwargs): @events.on_cancelling_workflow_signal.connect def _workflow_cancelling(workflow_context, *args, **kwargs): - with workflow_context.track_execution: + with workflow_context.track_changes: execution = workflow_context.execution if execution.status == execution.PENDING: return _workflow_cancelled(workflow_context=workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f29148af/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index 543e190..12aebab 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -35,7 +35,7 @@ def _get_task_name(task): @events.start_task_signal.connect def _start_task_handler(ctx, **kwargs): - with ctx.track_task: + with ctx.track_changes: # If the task has no function this is an empty task. if ctx.task.function: suffix = 'started...' @@ -50,7 +50,7 @@ def _start_task_handler(ctx, **kwargs): @events.on_success_task_signal.connect def _success_task_handler(ctx, **kwargs): - with ctx.track_task: + with ctx.track_changes: if not ctx.task.function: return ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful' @@ -59,7 +59,7 @@ def _success_task_handler(ctx, **kwargs): @events.on_failure_task_signal.connect def _failure_operation_handler(ctx, traceback, **kwargs): - with ctx.track_task: + with ctx.track_changes: ctx.logger.error( '{name} {task.interface_name}.{task.operation_name} failed' .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback) @@ -68,19 +68,19 @@ def _failure_operation_handler(ctx, traceback, **kwargs): @events.start_workflow_signal.connect def _start_workflow_handler(context, **kwargs): - with context.track_execution: + with context.track_changes: context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context)) @events.on_failure_workflow_signal.connect def _failure_workflow_handler(context, **kwargs): - with context.track_execution: + with context.track_changes: context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context)) @events.on_success_workflow_signal.connect def _success_workflow_handler(context, **kwargs): - with context.track_execution: + with context.track_changes: context.logger.info( "'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context) ) @@ -88,13 +88,13 @@ def _success_workflow_handler(context, **kwargs): @events.on_cancelled_workflow_signal.connect def _cancel_workflow_handler(context, **kwargs): - with context.track_execution: + with context.track_changes: context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context)) @events.on_cancelling_workflow_signal.connect def _cancelling_workflow_handler(context, **kwargs): - with context.track_execution: + with context.track_changes: context.logger.info( "Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context) ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f29148af/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index ced80d6..9e1ce7e 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -33,7 +33,7 @@ class BaseExecutor(logger.LoggerMixin): Execute a task :param task: task to execute """ - with ctx.track_task: + with ctx.track_changes: if ctx.task.function: self._execute(ctx) else: @@ -64,5 +64,5 @@ class BaseExecutor(logger.LoggerMixin): class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method def execute(self, ctx, *args, **kwargs): - with ctx.track_task: + with ctx.track_changes: ctx.task.status = ctx.task.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f29148af/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py index a5f8507..88d2e12 100644 --- a/aria/orchestrator/workflows/executor/dry.py +++ b/aria/orchestrator/workflows/executor/dry.py @@ -26,7 +26,7 @@ class DryExecutor(base.BaseExecutor): Executor which dry runs tasks - prints task information without causing any side effects """ def execute(self, ctx): - with ctx.track_task: + with ctx.track_changes: # updating the task manually instead of calling self._task_started(task), # to avoid any side effects raising that event might cause ctx.task.started_at = datetime.utcnow() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f29148af/tests/end2end/testenv.py ---------------------------------------------------------------------- diff --git a/tests/end2end/testenv.py b/tests/end2end/testenv.py index 9da747c..87ca5bd 100644 --- a/tests/end2end/testenv.py +++ b/tests/end2end/testenv.py @@ -60,7 +60,9 @@ class TestEnvironment(object): def execute_workflow(self, service_name, workflow_name, dry=False): self.cli.executions.start(workflow_name, service_name=service_name, dry=dry) - self.model_storage.execution.refresh(self.model_storage.execution.list()[0]) + service = self.model_storage.service.get_by_name(service_name) + for active_execution in [e for e in service.executions if not e.has_ended()]: + self.model_storage.execution.refresh(active_execution) def verify_clean_storage(self): assert len(self.model_storage.service_template.list()) == 0 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f29148af/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index 61b7ce7..de40fcf 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -73,7 +73,7 @@ def test_task_graph_into_execution_graph(tmpdir): workflow_runner.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor) workflow_context.execution = execution - execution_tasks = topological_sort(workflow_context.graph) + execution_tasks = topological_sort(workflow_context._graph) assert len(execution_tasks) == 7