Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-278-Remove-core-tasks d517b820e -> 15a1f66f9
optimization - try1 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/15a1f66f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/15a1f66f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/15a1f66f Branch: refs/heads/ARIA-278-Remove-core-tasks Commit: 15a1f66f9ca8c51746a92cea59cf14b3c043b2fd Parents: d517b82 Author: max-orlov <ma...@gigaspaces.com> Authored: Thu Jun 15 18:44:51 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Thu Jun 15 18:44:51 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/core/engine.py | 9 +- .../workflows/core/events_handler.py | 129 +++++++++---------- aria/orchestrator/workflows/executor/base.py | 9 +- 3 files changed, 72 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/15a1f66f/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 48fb60a..e547aa1 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -38,6 +38,7 @@ class Engine(logger.LoggerMixin): def __init__(self, executor, **kwargs): super(Engine, self).__init__(**kwargs) self._executors = {executor.__class__: executor} + self._executing_tasks = [] def execute(self, ctx): """ @@ -88,7 +89,7 @@ class Engine(logger.LoggerMixin): ) def _ended_tasks(self, ctx): - for task in self._tasks_iter(ctx): + for task in self._executing_tasks: if task.has_ended() and task in ctx._graph: yield task @@ -122,12 +123,14 @@ class Engine(logger.LoggerMixin): name=task.name ) + self._executing_tasks.append(task) + if not task.stub_type: events.sent_task_signal.send(op_ctx) executor.execute(op_ctx) - @staticmethod - def _handle_ended_tasks(ctx, task): + def _handle_ended_tasks(self, ctx, task): + self._executing_tasks.remove(task) if task.status == models.Task.FAILED and not task.ignore_failure: raise exceptions.ExecutorException('Workflow failed') else: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/15a1f66f/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 3a780d5..1f03167 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -31,108 +31,99 @@ from ... import exceptions @events.sent_task_signal.connect def _task_sent(ctx, *args, **kwargs): - with ctx.track_changes: - ctx.task.status = ctx.task.SENT + ctx.task.status = ctx.task.SENT @events.start_task_signal.connect def _task_started(ctx, *args, **kwargs): - with ctx.track_changes: - ctx.task.started_at = datetime.utcnow() - ctx.task.status = ctx.task.STARTED - _update_node_state_if_necessary(ctx, is_transitional=True) + ctx.task.started_at = datetime.utcnow() + ctx.task.status = ctx.task.STARTED + _update_node_state_if_necessary(ctx, is_transitional=True) @events.on_failure_task_signal.connect def _task_failed(ctx, exception, *args, **kwargs): - with ctx.track_changes: - should_retry = all([ - not isinstance(exception, exceptions.TaskAbortException), - ctx.task.attempts_count < ctx.task.max_attempts or - ctx.task.max_attempts == ctx.task.INFINITE_RETRIES, - # ignore_failure check here means the task will not be retried and it will be marked - # as failed. The engine will also look at ignore_failure so it won't fail the - # workflow. - not ctx.task.ignore_failure - ]) - if should_retry: - retry_interval = None - if isinstance(exception, exceptions.TaskRetryException): - retry_interval = exception.retry_interval - if retry_interval is None: - retry_interval = ctx.task.retry_interval - ctx.task.status = ctx.task.RETRYING - ctx.task.attempts_count += 1 - ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval) - else: - ctx.task.ended_at = datetime.utcnow() - ctx.task.status = ctx.task.FAILED + should_retry = all([ + not isinstance(exception, exceptions.TaskAbortException), + ctx.task.attempts_count < ctx.task.max_attempts or + ctx.task.max_attempts == ctx.task.INFINITE_RETRIES, + # ignore_failure check here means the task will not be retried and it will be marked + # as failed. The engine will also look at ignore_failure so it won't fail the + # workflow. + not ctx.task.ignore_failure + ]) + if should_retry: + retry_interval = None + if isinstance(exception, exceptions.TaskRetryException): + retry_interval = exception.retry_interval + if retry_interval is None: + retry_interval = ctx.task.retry_interval + ctx.task.status = ctx.task.RETRYING + ctx.task.attempts_count += 1 + ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval) + else: + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.FAILED @events.on_success_task_signal.connect def _task_succeeded(ctx, *args, **kwargs): - with ctx.track_changes: - ctx.task.ended_at = datetime.utcnow() - ctx.task.status = ctx.task.SUCCESS + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.SUCCESS - _update_node_state_if_necessary(ctx) + _update_node_state_if_necessary(ctx) @events.start_workflow_signal.connect def _workflow_started(workflow_context, *args, **kwargs): - with workflow_context.track_changes: - execution = workflow_context.execution - # the execution may already be in the process of cancelling - if execution.status in (execution.CANCELLING, execution.CANCELLED): - return - execution.status = execution.STARTED - execution.started_at = datetime.utcnow() + execution = workflow_context.execution + # the execution may already be in the process of cancelling + if execution.status in (execution.CANCELLING, execution.CANCELLED): + return + execution.status = execution.STARTED + execution.started_at = datetime.utcnow() @events.on_failure_workflow_signal.connect def _workflow_failed(workflow_context, exception, *args, **kwargs): - with workflow_context.track_changes: - execution = workflow_context.execution - execution.error = str(exception) - execution.status = execution.FAILED - execution.ended_at = datetime.utcnow() + execution = workflow_context.execution + execution.error = str(exception) + execution.status = execution.FAILED + execution.ended_at = datetime.utcnow() @events.on_success_workflow_signal.connect def _workflow_succeeded(workflow_context, *args, **kwargs): - with workflow_context.track_changes: - execution = workflow_context.execution - execution.status = execution.SUCCEEDED - execution.ended_at = datetime.utcnow() + execution = workflow_context.execution + execution.status = execution.SUCCEEDED + execution.ended_at = datetime.utcnow() @events.on_cancelled_workflow_signal.connect def _workflow_cancelled(workflow_context, *args, **kwargs): - with workflow_context.track_changes: - execution = workflow_context.execution - # _workflow_cancelling function may have called this function already - if execution.status == execution.CANCELLED: - return - # the execution may have already been finished - elif execution.status in (execution.SUCCEEDED, execution.FAILED): - _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) - else: - execution.status = execution.CANCELLED - execution.ended_at = datetime.utcnow() + execution = workflow_context.execution + # _workflow_cancelling function may have called this function already + if execution.status == execution.CANCELLED: + return + # the execution may have already been finished + elif execution.status in (execution.SUCCEEDED, execution.FAILED): + _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) + else: + execution.status = execution.CANCELLED + execution.ended_at = datetime.utcnow() @events.on_cancelling_workflow_signal.connect def _workflow_cancelling(workflow_context, *args, **kwargs): - with workflow_context.track_changes: - execution = workflow_context.execution - if execution.status == execution.PENDING: - return _workflow_cancelled(workflow_context=workflow_context) - # the execution may have already been finished - elif execution.status in (execution.SUCCEEDED, execution.FAILED): - _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) - else: - execution.status = execution.CANCELLING - workflow_context.execution = execution + execution = workflow_context.execution + if execution.status == execution.PENDING: + return _workflow_cancelled(workflow_context=workflow_context) + # the execution may have already been finished + elif execution.status in (execution.SUCCEEDED, execution.FAILED): + _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) + else: + execution.status = execution.CANCELLING + workflow_context.execution = execution def _update_node_state_if_necessary(ctx, is_transitional=False): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/15a1f66f/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 9e1ce7e..dfb580c 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -51,15 +51,18 @@ class BaseExecutor(logger.LoggerMixin): @staticmethod def _task_started(ctx): - events.start_task_signal.send(ctx) + with ctx.track_changes: + events.start_task_signal.send(ctx) @staticmethod def _task_failed(ctx, exception, traceback=None): - events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback) + with ctx.track_changes: + events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback) @staticmethod def _task_succeeded(ctx): - events.on_success_task_signal.send(ctx) + with ctx.track_changes: + events.on_success_task_signal.send(ctx) class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method