Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks d517b820e -> 15a1f66f9


optimization - try1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/15a1f66f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/15a1f66f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/15a1f66f

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 15a1f66f9ca8c51746a92cea59cf14b3c043b2fd
Parents: d517b82
Author: max-orlov <ma...@gigaspaces.com>
Authored: Thu Jun 15 18:44:51 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu Jun 15 18:44:51 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/core/engine.py      |   9 +-
 .../workflows/core/events_handler.py            | 129 +++++++++----------
 aria/orchestrator/workflows/executor/base.py    |   9 +-
 3 files changed, 72 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/15a1f66f/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py 
b/aria/orchestrator/workflows/core/engine.py
index 48fb60a..e547aa1 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -38,6 +38,7 @@ class Engine(logger.LoggerMixin):
     def __init__(self, executor, **kwargs):
         super(Engine, self).__init__(**kwargs)
         self._executors = {executor.__class__: executor}
+        self._executing_tasks = []
 
     def execute(self, ctx):
         """
@@ -88,7 +89,7 @@ class Engine(logger.LoggerMixin):
         )
 
     def _ended_tasks(self, ctx):
-        for task in self._tasks_iter(ctx):
+        for task in self._executing_tasks:
             if task.has_ended() and task in ctx._graph:
                 yield task
 
@@ -122,12 +123,14 @@ class Engine(logger.LoggerMixin):
             name=task.name
         )
 
+        self._executing_tasks.append(task)
+
         if not task.stub_type:
             events.sent_task_signal.send(op_ctx)
         executor.execute(op_ctx)
 
-    @staticmethod
-    def _handle_ended_tasks(ctx, task):
+    def _handle_ended_tasks(self, ctx, task):
+        self._executing_tasks.remove(task)
         if task.status == models.Task.FAILED and not task.ignore_failure:
             raise exceptions.ExecutorException('Workflow failed')
         else:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/15a1f66f/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py 
b/aria/orchestrator/workflows/core/events_handler.py
index 3a780d5..1f03167 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -31,108 +31,99 @@ from ... import exceptions
 
 @events.sent_task_signal.connect
 def _task_sent(ctx, *args, **kwargs):
-    with ctx.track_changes:
-        ctx.task.status = ctx.task.SENT
+    ctx.task.status = ctx.task.SENT
 
 
 @events.start_task_signal.connect
 def _task_started(ctx, *args, **kwargs):
-    with ctx.track_changes:
-        ctx.task.started_at = datetime.utcnow()
-        ctx.task.status = ctx.task.STARTED
-        _update_node_state_if_necessary(ctx, is_transitional=True)
+    ctx.task.started_at = datetime.utcnow()
+    ctx.task.status = ctx.task.STARTED
+    _update_node_state_if_necessary(ctx, is_transitional=True)
 
 
 @events.on_failure_task_signal.connect
 def _task_failed(ctx, exception, *args, **kwargs):
-    with ctx.track_changes:
-        should_retry = all([
-            not isinstance(exception, exceptions.TaskAbortException),
-            ctx.task.attempts_count < ctx.task.max_attempts or
-            ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
-            # ignore_failure check here means the task will not be retried and 
it will be marked
-            # as failed. The engine will also look at ignore_failure so it 
won't fail the
-            # workflow.
-            not ctx.task.ignore_failure
-        ])
-        if should_retry:
-            retry_interval = None
-            if isinstance(exception, exceptions.TaskRetryException):
-                retry_interval = exception.retry_interval
-            if retry_interval is None:
-                retry_interval = ctx.task.retry_interval
-            ctx.task.status = ctx.task.RETRYING
-            ctx.task.attempts_count += 1
-            ctx.task.due_at = datetime.utcnow() + 
timedelta(seconds=retry_interval)
-        else:
-            ctx.task.ended_at = datetime.utcnow()
-            ctx.task.status = ctx.task.FAILED
+    should_retry = all([
+        not isinstance(exception, exceptions.TaskAbortException),
+        ctx.task.attempts_count < ctx.task.max_attempts or
+        ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
+        # ignore_failure check here means the task will not be retried and it 
will be marked
+        # as failed. The engine will also look at ignore_failure so it won't 
fail the
+        # workflow.
+        not ctx.task.ignore_failure
+    ])
+    if should_retry:
+        retry_interval = None
+        if isinstance(exception, exceptions.TaskRetryException):
+            retry_interval = exception.retry_interval
+        if retry_interval is None:
+            retry_interval = ctx.task.retry_interval
+        ctx.task.status = ctx.task.RETRYING
+        ctx.task.attempts_count += 1
+        ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
+    else:
+        ctx.task.ended_at = datetime.utcnow()
+        ctx.task.status = ctx.task.FAILED
 
 
 @events.on_success_task_signal.connect
 def _task_succeeded(ctx, *args, **kwargs):
-    with ctx.track_changes:
-        ctx.task.ended_at = datetime.utcnow()
-        ctx.task.status = ctx.task.SUCCESS
+    ctx.task.ended_at = datetime.utcnow()
+    ctx.task.status = ctx.task.SUCCESS
 
-        _update_node_state_if_necessary(ctx)
+    _update_node_state_if_necessary(ctx)
 
 
 @events.start_workflow_signal.connect
 def _workflow_started(workflow_context, *args, **kwargs):
-    with workflow_context.track_changes:
-        execution = workflow_context.execution
-        # the execution may already be in the process of cancelling
-        if execution.status in (execution.CANCELLING, execution.CANCELLED):
-            return
-        execution.status = execution.STARTED
-        execution.started_at = datetime.utcnow()
+    execution = workflow_context.execution
+    # the execution may already be in the process of cancelling
+    if execution.status in (execution.CANCELLING, execution.CANCELLED):
+        return
+    execution.status = execution.STARTED
+    execution.started_at = datetime.utcnow()
 
 
 @events.on_failure_workflow_signal.connect
 def _workflow_failed(workflow_context, exception, *args, **kwargs):
-    with workflow_context.track_changes:
-        execution = workflow_context.execution
-        execution.error = str(exception)
-        execution.status = execution.FAILED
-        execution.ended_at = datetime.utcnow()
+    execution = workflow_context.execution
+    execution.error = str(exception)
+    execution.status = execution.FAILED
+    execution.ended_at = datetime.utcnow()
 
 
 @events.on_success_workflow_signal.connect
 def _workflow_succeeded(workflow_context, *args, **kwargs):
-    with workflow_context.track_changes:
-        execution = workflow_context.execution
-        execution.status = execution.SUCCEEDED
-        execution.ended_at = datetime.utcnow()
+    execution = workflow_context.execution
+    execution.status = execution.SUCCEEDED
+    execution.ended_at = datetime.utcnow()
 
 
 @events.on_cancelled_workflow_signal.connect
 def _workflow_cancelled(workflow_context, *args, **kwargs):
-    with workflow_context.track_changes:
-        execution = workflow_context.execution
-        # _workflow_cancelling function may have called this function already
-        if execution.status == execution.CANCELLED:
-            return
-        # the execution may have already been finished
-        elif execution.status in (execution.SUCCEEDED, execution.FAILED):
-            
_log_tried_to_cancel_execution_but_it_already_ended(workflow_context, 
execution.status)
-        else:
-            execution.status = execution.CANCELLED
-            execution.ended_at = datetime.utcnow()
+    execution = workflow_context.execution
+    # _workflow_cancelling function may have called this function already
+    if execution.status == execution.CANCELLED:
+        return
+    # the execution may have already been finished
+    elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+        _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, 
execution.status)
+    else:
+        execution.status = execution.CANCELLED
+        execution.ended_at = datetime.utcnow()
 
 
 @events.on_cancelling_workflow_signal.connect
 def _workflow_cancelling(workflow_context, *args, **kwargs):
-    with workflow_context.track_changes:
-        execution = workflow_context.execution
-        if execution.status == execution.PENDING:
-            return _workflow_cancelled(workflow_context=workflow_context)
-        # the execution may have already been finished
-        elif execution.status in (execution.SUCCEEDED, execution.FAILED):
-            
_log_tried_to_cancel_execution_but_it_already_ended(workflow_context, 
execution.status)
-        else:
-            execution.status = execution.CANCELLING
-            workflow_context.execution = execution
+    execution = workflow_context.execution
+    if execution.status == execution.PENDING:
+        return _workflow_cancelled(workflow_context=workflow_context)
+    # the execution may have already been finished
+    elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+        _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, 
execution.status)
+    else:
+        execution.status = execution.CANCELLING
+        workflow_context.execution = execution
 
 
 def _update_node_state_if_necessary(ctx, is_transitional=False):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/15a1f66f/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py 
b/aria/orchestrator/workflows/executor/base.py
index 9e1ce7e..dfb580c 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -51,15 +51,18 @@ class BaseExecutor(logger.LoggerMixin):
 
     @staticmethod
     def _task_started(ctx):
-        events.start_task_signal.send(ctx)
+        with ctx.track_changes:
+            events.start_task_signal.send(ctx)
 
     @staticmethod
     def _task_failed(ctx, exception, traceback=None):
-        events.on_failure_task_signal.send(ctx, exception=exception, 
traceback=traceback)
+        with ctx.track_changes:
+            events.on_failure_task_signal.send(ctx, exception=exception, 
traceback=traceback)
 
     @staticmethod
     def _task_succeeded(ctx):
-        events.on_success_task_signal.send(ctx)
+        with ctx.track_changes:
+            events.on_success_task_signal.send(ctx)
 
 
 class StubTaskExecutor(BaseExecutor):                                          
                     # pylint: disable=abstract-method

Reply via email to