ARIA-408 remove execution creation from workflow runner

As a part of this PR the creation of the models and
the execution of the workflow were split. this enables creating
the execution and executing it from a different processes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/33534111
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/33534111
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/33534111

Branch: refs/heads/ARIA-408-Remove-execution-creation-from-WorkflowRunner
Commit: 33534111cbbd392f1f2cde39d5140cf0e2875962
Parents: 730750f
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Nov 19 10:09:54 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Nov 21 18:50:07 2017 +0200

----------------------------------------------------------------------
 aria/cli/commands/executions.py                 | 101 ++-
 aria/orchestrator/execution_preparer.py         | 170 +++++
 aria/orchestrator/workflow_runner.py            | 188 -----
 aria/orchestrator/workflows/core/engine.py      |   4 +-
 docs/aria.orchestrator.rst                      |   6 +-
 test_ssh.py                                     | 528 --------------
 tests/end2end/testenv.py                        |   1 +
 tests/orchestrator/context/__init__.py          |   2 +-
 tests/orchestrator/context/test_serialize.py    |   2 +-
 tests/orchestrator/execution/__init__.py        |  14 +
 .../execution/test_execution_compiler.py        | 628 ++++++++++++++++
 .../orchestrator/execution_plugin/test_local.py |   2 +-
 tests/orchestrator/execution_plugin/test_ssh.py |   2 +-
 tests/orchestrator/test_workflow_runner.py      | 726 -------------------
 .../orchestrator/workflows/core/test_engine.py  |   2 +-
 .../orchestrator/workflows/core/test_events.py  |   2 +-
 .../executor/test_process_executor_extension.py |   2 +-
 .../test_process_executor_tracked_changes.py    |   2 +-
 18 files changed, 887 insertions(+), 1495 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index cecbbc5..2415e19 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -25,9 +25,11 @@ from .. import utils
 from .. import logger as cli_logger
 from .. import execution_logging
 from ..core import aria
+from ...orchestrator import execution_preparer
 from ...modeling.models import Execution
-from ...orchestrator.workflow_runner import WorkflowRunner
+from ...orchestrator.workflows.core.engine import Engine
 from ...orchestrator.workflows.executor.dry import DryExecutor
+from ...orchestrator.workflows.executor.process import ProcessExecutor
 from ...utils import formatting
 from ...utils import threading
 
@@ -141,17 +143,21 @@ def start(workflow_name,
     WORKFLOW_NAME is the unique name of the workflow within the service (e.g. 
"uninstall").
     """
     service = model_storage.service.get_by_name(service_name)
-    executor = DryExecutor() if dry else None  # use WorkflowRunner's default 
executor
-
-    workflow_runner = \
-        WorkflowRunner(
-            model_storage, resource_storage, plugin_manager,
-            service_id=service.id, workflow_name=workflow_name, inputs=inputs, 
executor=executor,
-            task_max_attempts=task_max_attempts, 
task_retry_interval=task_retry_interval
-        )
+    executor = DryExecutor() if dry else 
ProcessExecutor(plugin_manager=plugin_manager)
+
+    compiler = execution_preparer.ExecutionPreparer(
+        model_storage,
+        resource_storage,
+        plugin_manager,
+        service,
+        workflow_name
+    )
+    workflow_ctx = compiler.prepare(inputs, executor=executor)
+
+    engine = Engine(executor)
     logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if 
dry else ''))
 
-    _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+    _run_execution(engine, workflow_ctx, logger, model_storage, dry, 
mark_pattern)
 
 
 @executions.command(name='resume',
@@ -178,45 +184,61 @@ def resume(execution_id,
 
     EXECUTION_ID is the unique ID of the execution.
     """
-    executor = DryExecutor() if dry else None  # use WorkflowRunner's default 
executor
+    executor = DryExecutor() if dry else 
ProcessExecutor(plugin_manager=plugin_manager)
 
-    execution = model_storage.execution.get(execution_id)
-    if execution.status != execution.CANCELLED:
+    execution_to_resume = model_storage.execution.get(execution_id)
+    if execution_to_resume.status != execution_to_resume.CANCELLED:
         logger.info("Can't resume execution {execution.id} - "
                     "execution is in status {execution.status}. "
-                    "Can only resume executions in status {valid_status}"
-                    .format(execution=execution, 
valid_status=execution.CANCELLED))
+                    "Can only resume executions in status 
{execution.CANCELLED}"
+                    .format(execution=execution_to_resume))
         return
 
-    workflow_runner = \
-        WorkflowRunner(
-            model_storage, resource_storage, plugin_manager,
-            execution_id=execution_id, retry_failed_tasks=retry_failed_tasks, 
executor=executor,
-        )
-
-    logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if 
dry else ''))
-    _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+    workflow_ctx = execution_preparer.ExecutionPreparer(
+        model_storage,
+        resource_storage,
+        plugin_manager,
+        execution_to_resume.service,
+        execution_to_resume.workflow_name
+    ).prepare(execution_id=execution_to_resume.id)
 
+    engine = Engine(executor)
 
-def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern):
-    execution_thread_name = '{0}_{1}'.format(workflow_runner.service.name,
-                                             
workflow_runner.execution.workflow_name)
-    execution_thread = 
threading.ExceptionThread(target=workflow_runner.execute,
-                                                 name=execution_thread_name)
+    logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if 
dry else ''))
+    _run_execution(engine, workflow_ctx, logger, model_storage, dry, 
mark_pattern,
+                   engine_kwargs=dict(resuming=True, 
retry_failed=retry_failed_tasks))
+
+
+def _run_execution(
+        engine,
+        ctx,
+        logger,
+        model_storage,
+        dry,
+        mark_pattern,
+        engine_kwargs=None
+):
+    engine_kwargs = engine_kwargs or {}
+    engine_kwargs['ctx'] = ctx
+    execution_thread_name = '{0}_{1}'.format(ctx.execution.service.name,
+                                             ctx.execution.workflow_name)
+    execution_thread = threading.ExceptionThread(target=engine.execute,
+                                                 name=execution_thread_name,
+                                                 kwargs=engine_kwargs)
 
     execution_thread.start()
 
-    last_task_id = workflow_runner.execution.logs[-1].id if 
workflow_runner.execution.logs else 0
-    log_iterator = cli_logger.ModelLogIterator(model_storage,
-                                               workflow_runner.execution_id,
-                                               offset=last_task_id)
+    last_task_id = ctx.execution.logs[-1].id if ctx.execution.logs else 0
+    log_iterator = cli_logger.ModelLogIterator(model_storage, 
ctx.execution.id, offset=last_task_id)
     try:
         while execution_thread.is_alive():
             execution_logging.log_list(log_iterator, mark_pattern=mark_pattern)
             execution_thread.join(1)
 
     except KeyboardInterrupt:
-        _cancel_execution(workflow_runner, execution_thread, logger, 
log_iterator)
+        _cancel_execution(engine, ctx, execution_thread, logger, log_iterator)
+
+    model_storage.execution.refresh(ctx.execution)
 
     # It might be the case where some logs were written and the execution was 
terminated, thus we
     # need to drain the remaining logs.
@@ -225,19 +247,18 @@ def _run_execution(workflow_runner, logger, 
model_storage, dry, mark_pattern):
     # raise any errors from the execution thread (note these are not workflow 
execution errors)
     execution_thread.raise_error_if_exists()
 
-    execution = workflow_runner.execution
-    logger.info('Execution has ended with "{0}" 
status'.format(execution.status))
-    if execution.status == Execution.FAILED and execution.error:
-        logger.info('Execution error:{0}{1}'.format(os.linesep, 
execution.error))
+    logger.info('Execution has ended with "{0}" 
status'.format(ctx.execution.status))
+    if ctx.execution.status == Execution.FAILED and ctx.execution.error:
+        logger.info('Execution error:{0}{1}'.format(os.linesep, 
ctx.execution.error))
 
     if dry:
         # remove traces of the dry execution (including tasks, logs, inputs..)
-        model_storage.execution.delete(execution)
+        model_storage.execution.delete(ctx.execution)
 
 
-def _cancel_execution(workflow_runner, execution_thread, logger, log_iterator):
+def _cancel_execution(engine, ctx, execution_thread, logger, log_iterator):
     logger.info('Cancelling execution. Press Ctrl+C again to force-cancel.')
-    workflow_runner.cancel()
+    engine.cancel_execution(ctx)
     while execution_thread.is_alive():
         try:
             execution_logging.log_list(log_iterator)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/aria/orchestrator/execution_preparer.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_preparer.py 
b/aria/orchestrator/execution_preparer.py
new file mode 100644
index 0000000..145577a
--- /dev/null
+++ b/aria/orchestrator/execution_preparer.py
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionPreparer(object):
+    """
+    This class manages any execution and tasks related preparation for an 
execution of a workflow.
+    """
+    def __init__(
+            self,
+            model_storage,
+            resource_storage,
+            plugin_manager,
+            service,
+            workflow_name,
+            task_max_attempts=None,
+            task_retry_interval=None
+    ):
+        self._model = model_storage
+        self._resource = resource_storage
+        self._plugin = plugin_manager
+        self._service = service
+        self._workflow_name = workflow_name
+        self._task_max_attempts = task_max_attempts or 
DEFAULT_TASK_MAX_ATTEMPTS
+        self._task_retry_interval = task_retry_interval or 
DEFAULT_TASK_RETRY_INTERVAL
+
+    def get_workflow_ctx(self, execution):
+        return WorkflowContext(
+            name=self._workflow_name,
+            model_storage=self._model,
+            resource_storage=self._resource,
+            service_id=execution.service.id,
+            execution_id=execution.id,
+            workflow_name=execution.workflow_name,
+            task_max_attempts=self._task_max_attempts,
+            task_retry_interval=self._task_retry_interval,
+        )
+
+    def prepare(self, execution_inputs=None, executor=None, execution_id=None):
+        """
+        If there is a need to prepare a new execution (e.g. execution_id is 
not provided),
+        a new execution and task models are created. Any any case, a 
corresponding workflow
+        context is returned.
+
+        :param execution_inputs: inputs for the execution.
+        :param executor: the execution for the tasks
+        :param execution_id: used for an existing execution (mainly for 
resuming).
+        :return:
+        """
+        assert not (execution_inputs and executor and execution_id)
+
+        if execution_id is None:
+            # If the execution is new
+            execution = self._create_execution_model(execution_inputs)
+            self._model.execution.put(execution)
+            ctx = self.get_workflow_ctx(execution)
+            self._create_tasks(ctx, executor)
+            self._model.execution.update(execution)
+        else:
+            # If resuming an execution
+            execution = self._model.execution.get(execution_id)
+            ctx = self.get_workflow_ctx(execution)
+
+        return ctx
+
+    def _create_tasks(self, ctx, executor=None):
+
+        # Set default executor and kwargs
+        executor = executor or ProcessExecutor(plugin_manager=self._plugin)
+
+        # transforming the execution inputs to dict, to pass them to the 
workflow function
+        execution_inputs_dict = dict(inp.unwrapped for inp in 
ctx.execution.inputs.itervalues())
+
+        workflow_fn = self._get_workflow_fn(ctx.execution.workflow_name)
+        api_tasks_graph = workflow_fn(ctx=ctx, **execution_inputs_dict)
+        compiler = graph_compiler.GraphCompiler(ctx, executor.__class__)
+        compiler.compile(api_tasks_graph)
+
+    def _create_execution_model(self, inputs=None):
+        self._validate_workflow_exists_for_service()
+        self._validate_no_active_executions()
+
+        execution = models.Execution(
+            created_at=datetime.utcnow(),
+            service_fk=self._service.id,
+            workflow_name=self._workflow_name,
+            inputs={})
+
+        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+            workflow_inputs = dict()  # built-in workflows don't have any 
inputs
+        else:
+            workflow_inputs = 
self._service.workflows[self._workflow_name].inputs
+
+        
modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
+                                                     supplied_inputs=inputs or 
{})
+        
modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
+                                                             
supplied_inputs=inputs or {})
+        execution.inputs = modeling_utils.merge_parameter_values(
+            inputs, workflow_inputs, model_cls=models.Input)
+
+        return execution
+
+    def _validate_no_active_executions(self):
+        active_executions = [e for e in self._service.executions if
+                             e.is_active()]
+        if active_executions:
+            raise exceptions.ActiveExecutionsError(
+                "Can't start execution; Service {0} has an active execution 
with ID {1}"
+                .format(self._service.name, active_executions[0].id))
+
+    def _validate_workflow_exists_for_service(self):
+        if self._workflow_name not in self._service.workflows and \
+                        self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
+            raise exceptions.UndeclaredWorkflowError(
+                'No workflow policy {0} declared in service {1}'
+                .format(self._workflow_name, self._service.name))
+
+    def _get_workflow_fn(self, workflow_name):
+        if workflow_name in builtin.BUILTIN_WORKFLOWS:
+            return 
import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
+                                                    workflow_name))
+
+        workflow = self._service.workflows[workflow_name]
+
+        # TODO: Custom workflow support needs improvement, currently this code 
uses internal
+        # knowledge of the resource storage; Instead, workflows should 
probably be loaded
+        # in a similar manner to operation plugins. Also consider passing to 
import_fullname
+        # as paths instead of appending to sys path.
+        service_template_resources_path = os.path.join(
+            self._resource.service_template.base_path,
+            str(self._service.service_template.id))
+        sys.path.append(service_template_resources_path)
+
+        try:
+            workflow_fn = import_fullname(workflow.function)
+        except ImportError:
+            raise exceptions.WorkflowImplementationNotFoundError(
+                'Could not find workflow {0} function at {1}'.format(
+                    workflow_name, workflow.function))
+
+        return workflow_fn

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py 
b/aria/orchestrator/workflow_runner.py
deleted file mode 100644
index 4dbf29b..0000000
--- a/aria/orchestrator/workflow_runner.py
+++ /dev/null
@@ -1,188 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Running workflows.
-"""
-
-import os
-import sys
-from datetime import datetime
-
-from . import exceptions
-from .context.workflow import WorkflowContext
-from .workflows import builtin
-from .workflows.core import engine, graph_compiler
-from .workflows.executor.process import ProcessExecutor
-from ..modeling import models
-from ..modeling import utils as modeling_utils
-from ..utils.imports import import_fullname
-
-
-DEFAULT_TASK_MAX_ATTEMPTS = 30
-DEFAULT_TASK_RETRY_INTERVAL = 30
-
-
-class WorkflowRunner(object):
-
-    def __init__(self, model_storage, resource_storage, plugin_manager,
-                 execution_id=None, retry_failed_tasks=False,
-                 service_id=None, workflow_name=None, inputs=None, 
executor=None,
-                 task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
-                 task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
-        """
-        Manages a single workflow execution on a given service.
-
-        :param workflow_name: workflow name
-        :param service_id: service ID
-        :param inputs: key-value dict of inputs for the execution
-        :param model_storage: model storage API ("MAPI")
-        :param resource_storage: resource storage API ("RAPI")
-        :param plugin_manager: plugin manager
-        :param executor: executor for tasks; defaults to a
-         
:class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance
-        :param task_max_attempts: maximum attempts of repeating each failing 
task
-        :param task_retry_interval: retry interval between retry attempts of a 
failing task
-        """
-
-        if not (execution_id or (workflow_name and service_id)):
-            exceptions.InvalidWorkflowRunnerParams(
-                "Either provide execution id in order to resume a workflow or 
workflow name "
-                "and service id with inputs")
-
-        self._is_resume = execution_id is not None
-        self._retry_failed_tasks = retry_failed_tasks
-
-        self._model_storage = model_storage
-        self._resource_storage = resource_storage
-
-        # the IDs are stored rather than the models themselves, so this module 
could be used
-        # by several threads without raising errors on model objects shared 
between threads
-
-        if self._is_resume:
-            self._execution_id = execution_id
-            self._service_id = self.execution.service.id
-            self._workflow_name = 
model_storage.execution.get(self._execution_id).workflow_name
-        else:
-            self._service_id = service_id
-            self._workflow_name = workflow_name
-            self._validate_workflow_exists_for_service()
-            self._execution_id = self._create_execution_model(inputs).id
-
-        self._workflow_context = WorkflowContext(
-            name=self.__class__.__name__,
-            model_storage=self._model_storage,
-            resource_storage=resource_storage,
-            service_id=service_id,
-            execution_id=self._execution_id,
-            workflow_name=self._workflow_name,
-            task_max_attempts=task_max_attempts,
-            task_retry_interval=task_retry_interval)
-
-        # Set default executor and kwargs
-        executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
-
-        # transforming the execution inputs to dict, to pass them to the 
workflow function
-        execution_inputs_dict = dict(inp.unwrapped for inp in 
self.execution.inputs.itervalues())
-
-        if not self._is_resume:
-            workflow_fn = self._get_workflow_fn()
-            self._tasks_graph = workflow_fn(ctx=self._workflow_context, 
**execution_inputs_dict)
-            compiler = graph_compiler.GraphCompiler(self._workflow_context, 
executor.__class__)
-            compiler.compile(self._tasks_graph)
-
-        self._engine = engine.Engine(executors={executor.__class__: executor})
-
-    @property
-    def execution_id(self):
-        return self._execution_id
-
-    @property
-    def execution(self):
-        return self._model_storage.execution.get(self.execution_id)
-
-    @property
-    def service(self):
-        return self._model_storage.service.get(self._service_id)
-
-    def execute(self):
-        self._engine.execute(ctx=self._workflow_context,
-                             resuming=self._is_resume,
-                             retry_failed=self._retry_failed_tasks)
-
-    def cancel(self):
-        self._engine.cancel_execution(ctx=self._workflow_context)
-
-    def _create_execution_model(self, inputs):
-        execution = models.Execution(
-            created_at=datetime.utcnow(),
-            service=self.service,
-            workflow_name=self._workflow_name,
-            inputs={})
-
-        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
-            workflow_inputs = dict()  # built-in workflows don't have any 
inputs
-        else:
-            workflow_inputs = 
self.service.workflows[self._workflow_name].inputs
-
-        
modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
-                                                     supplied_inputs=inputs or 
{})
-        
modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
-                                                             
supplied_inputs=inputs or {})
-        execution.inputs = modeling_utils.merge_parameter_values(
-            inputs, workflow_inputs, model_cls=models.Input)
-        # TODO: these two following calls should execute atomically
-        self._validate_no_active_executions(execution)
-        self._model_storage.execution.put(execution)
-        return execution
-
-    def _validate_workflow_exists_for_service(self):
-        if self._workflow_name not in self.service.workflows and \
-                        self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
-            raise exceptions.UndeclaredWorkflowError(
-                'No workflow policy {0} declared in service {1}'
-                .format(self._workflow_name, self.service.name))
-
-    def _validate_no_active_executions(self, execution):
-        active_executions = [e for e in self.service.executions if 
e.is_active()]
-        if active_executions:
-            raise exceptions.ActiveExecutionsError(
-                "Can't start execution; Service {0} has an active execution 
with ID {1}"
-                .format(self.service.name, active_executions[0].id))
-
-    def _get_workflow_fn(self):
-        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
-            return 
import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
-                                                    self._workflow_name))
-
-        workflow = self.service.workflows[self._workflow_name]
-
-        # TODO: Custom workflow support needs improvement, currently this code 
uses internal
-        # knowledge of the resource storage; Instead, workflows should 
probably be loaded
-        # in a similar manner to operation plugins. Also consider passing to 
import_fullname
-        # as paths instead of appending to sys path.
-        service_template_resources_path = os.path.join(
-            self._resource_storage.service_template.base_path,
-            str(self.service.service_template.id))
-        sys.path.append(service_template_resources_path)
-
-        try:
-            workflow_fn = import_fullname(workflow.function)
-        except ImportError:
-            raise exceptions.WorkflowImplementationNotFoundError(
-                'Could not find workflow {0} function at {1}'.format(
-                    self._workflow_name, workflow.function))
-
-        return workflow_fn

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py 
b/aria/orchestrator/workflows/core/engine.py
index 0ec3cd8..0d7d2ae 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -36,9 +36,9 @@ class Engine(logger.LoggerMixin):
     Executes workflows.
     """
 
-    def __init__(self, executors, **kwargs):
+    def __init__(self, *executors, **kwargs):
         super(Engine, self).__init__(**kwargs)
-        self._executors = executors.copy()
+        self._executors = dict((e.__class__, e) for e in executors)
         self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
 
     def execute(self, ctx, resuming=False, retry_failed=False):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/docs/aria.orchestrator.rst
----------------------------------------------------------------------
diff --git a/docs/aria.orchestrator.rst b/docs/aria.orchestrator.rst
index 33454e6..6e6d659 100644
--- a/docs/aria.orchestrator.rst
+++ b/docs/aria.orchestrator.rst
@@ -40,7 +40,7 @@
 
 .. automodule:: aria.orchestrator.plugin
 
-:mod:`aria.orchestrator.workflow_runner`
-----------------------------------------
+:mod:`aria.orchestrator.execution_preparer`
+-------------------------------------------
 
-.. automodule:: aria.orchestrator.workflow_runner
+.. automodule:: aria.orchestrator.execution_preparer

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/test_ssh.py
----------------------------------------------------------------------
diff --git a/test_ssh.py b/test_ssh.py
deleted file mode 100644
index 5256cf8..0000000
--- a/test_ssh.py
+++ /dev/null
@@ -1,528 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import contextlib
-import json
-import logging
-import os
-
-import pytest
-
-import fabric.api
-from fabric.contrib import files
-from fabric import context_managers
-
-from aria.modeling import models
-from aria.orchestrator import events
-from aria.orchestrator import workflow
-from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.executor import process
-from aria.orchestrator.workflows.core import engine, graph_compiler
-from aria.orchestrator.workflows.exceptions import ExecutorException
-from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
-from aria.orchestrator.execution_plugin import operations
-from aria.orchestrator.execution_plugin import constants
-from aria.orchestrator.execution_plugin.exceptions import ProcessException, 
TaskException
-from aria.orchestrator.execution_plugin.ssh import operations as ssh_operations
-
-from tests import mock, storage, resources
-from tests.orchestrator.workflows.helpers import events_collector
-
-_CUSTOM_BASE_DIR = '/tmp/new-aria-ctx'
-
-import tests
-KEY_FILENAME = os.path.join(tests.ROOT_DIR, 'tests/resources/keys/test')
-
-_FABRIC_ENV = {
-    'disable_known_hosts': True,
-    'user': 'test',
-    'key_filename': KEY_FILENAME
-}
-
-
-import mockssh
-@pytest.fixture(scope='session')
-def server():
-    with mockssh.Server({'test': KEY_FILENAME}) as s:
-        yield s
-
-
-#@pytest.mark.skipif(not os.environ.get('TRAVIS'), reason='actual ssh server 
required')
-class TestWithActualSSHServer(object):
-
-    def test_run_script_basic(self):
-        expected_attribute_value = 'some_value'
-        props = self._execute(env={'test_value': expected_attribute_value})
-        assert props['test_value'].value == expected_attribute_value
-
-    @pytest.mark.skip(reason='sudo privileges are required')
-    def test_run_script_as_sudo(self):
-        self._execute(use_sudo=True)
-        with self._ssh_env():
-            assert files.exists('/opt/test_dir')
-            fabric.api.sudo('rm -rf /opt/test_dir')
-
-    def test_run_script_default_base_dir(self):
-        props = self._execute()
-        assert props['work_dir'].value == 
'{0}/work'.format(constants.DEFAULT_BASE_DIR)
-
-    @pytest.mark.skip(reason='Re-enable once output from process executor can 
be captured')
-    @pytest.mark.parametrize('hide_groups', [[], ['everything']])
-    def test_run_script_with_hide(self, hide_groups):
-        self._execute(hide_output=hide_groups)
-        output = 'TODO'
-        expected_log_message = ('[localhost] run: source {0}/scripts/'
-                                .format(constants.DEFAULT_BASE_DIR))
-        if hide_groups:
-            assert expected_log_message not in output
-        else:
-            assert expected_log_message in output
-
-    def test_run_script_process_config(self):
-        expected_env_value = 'test_value_env'
-        expected_arg1_value = 'test_value_arg1'
-        expected_arg2_value = 'test_value_arg2'
-        expected_cwd = '/tmp'
-        expected_base_dir = _CUSTOM_BASE_DIR
-        props = self._execute(
-            env={'test_value_env': expected_env_value},
-            process={
-                'args': [expected_arg1_value, expected_arg2_value],
-                'cwd': expected_cwd,
-                'base_dir': expected_base_dir
-            })
-        assert props['env_value'].value == expected_env_value
-        assert len(props['bash_version'].value) > 0
-        assert props['arg1_value'].value == expected_arg1_value
-        assert props['arg2_value'].value == expected_arg2_value
-        assert props['cwd'].value == expected_cwd
-        assert props['ctx_path'].value == '{0}/ctx'.format(expected_base_dir)
-
-    def test_run_script_command_prefix(self):
-        props = self._execute(process={'command_prefix': 'bash -i'})
-        assert 'i' in props['dollar_dash'].value
-
-    def test_run_script_reuse_existing_ctx(self):
-        expected_test_value_1 = 'test_value_1'
-        expected_test_value_2 = 'test_value_2'
-        props = self._execute(
-            test_operations=['{0}_1'.format(self.test_name),
-                             '{0}_2'.format(self.test_name)],
-            env={'test_value1': expected_test_value_1,
-                 'test_value2': expected_test_value_2})
-        assert props['test_value1'].value == expected_test_value_1
-        assert props['test_value2'].value == expected_test_value_2
-
-    def test_run_script_download_resource_plain(self, tmpdir):
-        resource = tmpdir.join('resource')
-        resource.write('content')
-        self._upload(str(resource), 'test_resource')
-        props = self._execute()
-        assert props['test_value'].value == 'content'
-
-    def test_run_script_download_resource_and_render(self, tmpdir):
-        resource = tmpdir.join('resource')
-        resource.write('{{ctx.service.name}}')
-        self._upload(str(resource), 'test_resource')
-        props = self._execute()
-        assert props['test_value'].value == self._workflow_context.service.name
-
-    @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 
'value'}])
-    def test_run_script_inputs_as_env_variables_no_override(self, value):
-        props = self._execute(custom_input=value)
-        return_value = props['test_value'].value
-        expected = return_value if isinstance(value, basestring) else 
json.loads(return_value)
-        assert value == expected
-
-    @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 
'value'}])
-    def test_run_script_inputs_as_env_variables_process_env_override(self, 
value):
-        props = self._execute(custom_input='custom-input-value',
-                              env={'custom_env_var': value})
-        return_value = props['test_value'].value
-        expected = return_value if isinstance(value, basestring) else 
json.loads(return_value)
-        assert value == expected
-
-    def test_run_script_error_in_script(self):
-        exception = self._execute_and_get_task_exception()
-        assert isinstance(exception, TaskException)
-
-    def test_run_script_abort_immediate(self):
-        exception = self._execute_and_get_task_exception()
-        assert isinstance(exception, TaskAbortException)
-        assert exception.message == 'abort-message'
-
-    def test_run_script_retry(self):
-        exception = self._execute_and_get_task_exception()
-        assert isinstance(exception, TaskRetryException)
-        assert exception.message == 'retry-message'
-
-    def test_run_script_abort_error_ignored_by_script(self):
-        exception = self._execute_and_get_task_exception()
-        assert isinstance(exception, TaskAbortException)
-        assert exception.message == 'abort-message'
-
-    def test_run_commands(self):
-        temp_file_path = '/tmp/very_temporary_file'
-        with self._ssh_env():
-            if files.exists(temp_file_path):
-                fabric.api.run('rm {0}'.format(temp_file_path))
-        self._execute(commands=['touch {0}'.format(temp_file_path)])
-        with self._ssh_env():
-            assert files.exists(temp_file_path)
-            fabric.api.run('rm {0}'.format(temp_file_path))
-
-    @pytest.fixture(autouse=True)
-    def _setup(self, request, workflow_context, executor, capfd, server):
-        print 'HI!!!!!!!!!!', server.port
-        self._workflow_context = workflow_context
-        self._executor = executor
-        self._capfd = capfd
-        self.test_name = request.node.originalname or request.node.name
-        with self._ssh_env(server):
-            for directory in [constants.DEFAULT_BASE_DIR, _CUSTOM_BASE_DIR]:
-                if files.exists(directory):
-                    fabric.api.run('rm -rf {0}'.format(directory))
-
-    @contextlib.contextmanager
-    def _ssh_env(self, server):
-        with self._capfd.disabled():
-            with context_managers.settings(fabric.api.hide('everything'),
-                                           
host_string='localhost:{0}'.format(server.port),
-                                           **_FABRIC_ENV):
-                yield
-
-    def _execute(self,
-                 env=None,
-                 use_sudo=False,
-                 hide_output=None,
-                 process=None,
-                 custom_input='',
-                 test_operations=None,
-                 commands=None):
-        process = process or {}
-        if env:
-            process.setdefault('env', {}).update(env)
-
-        test_operations = test_operations or [self.test_name]
-
-        local_script_path = os.path.join(resources.DIR, 'scripts', 
'test_ssh.sh')
-        script_path = os.path.basename(local_script_path)
-        self._upload(local_script_path, script_path)
-
-        if commands:
-            operation = operations.run_commands_with_ssh
-        else:
-            operation = operations.run_script_with_ssh
-
-        node = 
self._workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-        arguments = {
-            'script_path': script_path,
-            'fabric_env': _FABRIC_ENV,
-            'process': process,
-            'use_sudo': use_sudo,
-            'custom_env_var': custom_input,
-            'test_operation': '',
-        }
-        if hide_output:
-            arguments['hide_output'] = hide_output
-        if commands:
-            arguments['commands'] = commands
-        interface = mock.models.create_interface(
-            node.service,
-            'test',
-            'op',
-            operation_kwargs=dict(
-                function='{0}.{1}'.format(
-                    operations.__name__,
-                    operation.__name__),
-                arguments=arguments)
-        )
-        node.interfaces[interface.name] = interface
-
-        @workflow
-        def mock_workflow(ctx, graph):
-            ops = []
-            for test_operation in test_operations:
-                op_arguments = arguments.copy()
-                op_arguments['test_operation'] = test_operation
-                ops.append(api.task.OperationTask(
-                    node,
-                    interface_name='test',
-                    operation_name='op',
-                    arguments=op_arguments))
-
-            graph.sequence(*ops)
-            return graph
-        tasks_graph = mock_workflow(ctx=self._workflow_context)  # pylint: 
disable=no-value-for-parameter
-        graph_compiler.GraphCompiler(
-            self._workflow_context, 
self._executor.__class__).compile(tasks_graph)
-        eng = engine.Engine({self._executor.__class__: self._executor})
-        eng.execute(self._workflow_context)
-        return self._workflow_context.model.node.get_by_name(
-            mock.models.DEPENDENCY_NODE_NAME).attributes
-
-    def _execute_and_get_task_exception(self, *args, **kwargs):
-        signal = events.on_failure_task_signal
-        with events_collector(signal) as collected:
-            with pytest.raises(ExecutorException):
-                self._execute(*args, **kwargs)
-        return collected[signal][0]['kwargs']['exception']
-
-    def _upload(self, source, path):
-        self._workflow_context.resource.service.upload(
-            entry_id=str(self._workflow_context.service.id),
-            source=source,
-            path=path)
-
-    @pytest.fixture
-    def executor(self):
-        result = process.ProcessExecutor()
-        try:
-            yield result
-        finally:
-            result.close()
-
-    @pytest.fixture
-    def workflow_context(self, tmpdir):
-        workflow_context = mock.context.simple(str(tmpdir))
-        workflow_context.states = []
-        workflow_context.exception = None
-        yield workflow_context
-        storage.release_sqlite_storage(workflow_context.model)
-
-
-class TestFabricEnvHideGroupsAndRunCommands(object):
-
-    def test_fabric_env_default_override(self):
-        # first sanity for no override
-        self._run()
-        assert self.mock.settings_merged['timeout'] == 
constants.FABRIC_ENV_DEFAULTS['timeout']
-        # now override
-        invocation_fabric_env = self.default_fabric_env.copy()
-        timeout = 1000000
-        invocation_fabric_env['timeout'] = timeout
-        self._run(fabric_env=invocation_fabric_env)
-        assert self.mock.settings_merged['timeout'] == timeout
-
-    def test_implicit_host_string(self, mocker):
-        expected_host_address = '1.1.1.1'
-        mocker.patch.object(self._Ctx.task.actor, 'host')
-        mocker.patch.object(self._Ctx.task.actor.host, 'host_address', 
expected_host_address)
-        fabric_env = self.default_fabric_env.copy()
-        del fabric_env['host_string']
-        self._run(fabric_env=fabric_env)
-        assert self.mock.settings_merged['host_string'] == 
expected_host_address
-
-    def test_explicit_host_string(self):
-        fabric_env = self.default_fabric_env.copy()
-        host_string = 'explicit_host_string'
-        fabric_env['host_string'] = host_string
-        self._run(fabric_env=fabric_env)
-        assert self.mock.settings_merged['host_string'] == host_string
-
-    def test_override_warn_only(self):
-        fabric_env = self.default_fabric_env.copy()
-        self._run(fabric_env=fabric_env)
-        assert self.mock.settings_merged['warn_only'] is True
-        fabric_env = self.default_fabric_env.copy()
-        fabric_env['warn_only'] = False
-        self._run(fabric_env=fabric_env)
-        assert self.mock.settings_merged['warn_only'] is False
-
-    def test_missing_host_string(self):
-        with pytest.raises(TaskAbortException) as exc_ctx:
-            fabric_env = self.default_fabric_env.copy()
-            del fabric_env['host_string']
-            self._run(fabric_env=fabric_env)
-        assert '`host_string` not supplied' in str(exc_ctx.value)
-
-    def test_missing_user(self):
-        with pytest.raises(TaskAbortException) as exc_ctx:
-            fabric_env = self.default_fabric_env.copy()
-            del fabric_env['user']
-            self._run(fabric_env=fabric_env)
-        assert '`user` not supplied' in str(exc_ctx.value)
-
-    def test_missing_key_or_password(self):
-        with pytest.raises(TaskAbortException) as exc_ctx:
-            fabric_env = self.default_fabric_env.copy()
-            del fabric_env['key_filename']
-            self._run(fabric_env=fabric_env)
-        assert 'Access credentials not supplied' in str(exc_ctx.value)
-
-    def test_hide_in_settings_and_non_viable_groups(self):
-        groups = ('running', 'stdout')
-        self._run(hide_output=groups)
-        assert set(self.mock.settings_merged['hide_output']) == set(groups)
-        with pytest.raises(TaskAbortException) as exc_ctx:
-            self._run(hide_output=('running', 'bla'))
-        assert '`hide_output` must be a subset of' in str(exc_ctx.value)
-
-    def test_run_commands(self):
-        def test(use_sudo):
-            commands = ['command1', 'command2']
-            self._run(
-                commands=commands,
-                use_sudo=use_sudo)
-            assert all(item in self.mock.settings_merged.items() for
-                       item in self.default_fabric_env.items())
-            assert self.mock.settings_merged['warn_only'] is True
-            assert self.mock.settings_merged['use_sudo'] == use_sudo
-            assert self.mock.commands == commands
-            self.mock.settings_merged = {}
-            self.mock.commands = []
-        test(use_sudo=False)
-        test(use_sudo=True)
-
-    def test_failed_command(self):
-        with pytest.raises(ProcessException) as exc_ctx:
-            self._run(commands=['fail'])
-        exception = exc_ctx.value
-        assert exception.stdout == self.MockCommandResult.stdout
-        assert exception.stderr == self.MockCommandResult.stderr
-        assert exception.command == self.MockCommandResult.command
-        assert exception.exit_code == self.MockCommandResult.return_code
-
-    class MockCommandResult(object):
-        stdout = 'mock_stdout'
-        stderr = 'mock_stderr'
-        command = 'mock_command'
-        return_code = 1
-
-        def __init__(self, failed):
-            self.failed = failed
-
-    class MockFabricApi(object):
-
-        def __init__(self):
-            self.commands = []
-            self.settings_merged = {}
-
-        @contextlib.contextmanager
-        def settings(self, *args, **kwargs):
-            self.settings_merged.update(kwargs)
-            if args:
-                groups = args[0]
-                self.settings_merged.update({'hide_output': groups})
-            yield
-
-        def run(self, command):
-            self.commands.append(command)
-            self.settings_merged['use_sudo'] = False
-            return 
TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail')
-
-        def sudo(self, command):
-            self.commands.append(command)
-            self.settings_merged['use_sudo'] = True
-            return 
TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail')
-
-        def hide(self, *groups):
-            return groups
-
-        def exists(self, *args, **kwargs):
-            raise RuntimeError
-
-    class _Ctx(object):
-        INSTRUMENTATION_FIELDS = ()
-
-        class Task(object):
-            @staticmethod
-            def abort(message=None):
-                models.Task.abort(message)
-            actor = None
-
-        class Actor(object):
-            host = None
-
-        class Model(object):
-            @contextlib.contextmanager
-            def instrument(self, *args, **kwargs):
-                yield
-        task = Task
-        task.actor = Actor
-        model = Model()
-        logger = logging.getLogger()
-
-    @staticmethod
-    @contextlib.contextmanager
-    def _mock_self_logging(*args, **kwargs):
-        yield
-    _Ctx.logging_handlers = _mock_self_logging
-
-    @pytest.fixture(autouse=True)
-    def _setup(self, mocker):
-        self.default_fabric_env = {
-            'host_string': 'test',
-            'user': 'test',
-            'key_filename': 'test',
-        }
-        self.mock = self.MockFabricApi()
-        mocker.patch('fabric.api', self.mock)
-
-    def _run(self,
-             commands=(),
-             fabric_env=None,
-             process=None,
-             use_sudo=False,
-             hide_output=None):
-        operations.run_commands_with_ssh(
-            ctx=self._Ctx,
-            commands=commands,
-            process=process,
-            fabric_env=fabric_env or self.default_fabric_env,
-            use_sudo=use_sudo,
-            hide_output=hide_output)
-
-
-class TestUtilityFunctions(object):
-
-    def test_paths(self):
-        base_dir = '/path'
-        local_script_path = '/local/script/path.py'
-        paths = ssh_operations._Paths(base_dir=base_dir,
-                                      local_script_path=local_script_path)
-        assert paths.local_script_path == local_script_path
-        assert paths.remote_ctx_dir == base_dir
-        assert paths.base_script_path == 'path.py'
-        assert paths.remote_ctx_path == '/path/ctx'
-        assert paths.remote_scripts_dir == '/path/scripts'
-        assert paths.remote_work_dir == '/path/work'
-        assert 
paths.remote_env_script_path.startswith('/path/scripts/env-path.py-')
-        assert paths.remote_script_path.startswith('/path/scripts/path.py-')
-
-    def test_write_environment_script_file(self):
-        base_dir = '/path'
-        local_script_path = '/local/script/path.py'
-        paths = ssh_operations._Paths(base_dir=base_dir,
-                                      local_script_path=local_script_path)
-        env = {'one': "'1'"}
-        local_socket_url = 'local_socket_url'
-        remote_socket_url = 'remote_socket_url'
-        env_script_lines = set([l for l in 
ssh_operations._write_environment_script_file(
-            process={'env': env},
-            paths=paths,
-            local_socket_url=local_socket_url,
-            remote_socket_url=remote_socket_url
-        ).getvalue().split('\n') if l])
-        expected_env_script_lines = set([
-            'export PATH=/path:$PATH',
-            'export PYTHONPATH=/path:$PYTHONPATH',
-            'chmod +x /path/ctx',
-            'chmod +x {0}'.format(paths.remote_script_path),
-            'export CTX_SOCKET_URL={0}'.format(remote_socket_url),
-            'export LOCAL_CTX_SOCKET_URL={0}'.format(local_socket_url),
-            'export one=\'1\''
-        ])
-        assert env_script_lines == expected_env_script_lines

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/tests/end2end/testenv.py
----------------------------------------------------------------------
diff --git a/tests/end2end/testenv.py b/tests/end2end/testenv.py
index 43ec274..c3d055d 100644
--- a/tests/end2end/testenv.py
+++ b/tests/end2end/testenv.py
@@ -68,6 +68,7 @@ class TestEnvironment(object):
         assert len(self.model_storage.node_template.list()) == 0
         assert len(self.model_storage.node.list()) == 0
         assert len(self.model_storage.log.list()) == 0
+        assert len(self.model_storage.task.list()) == 0
 
     def _get_cli(self):
         cli = sh.aria.bake('-vvv', _out=sys.stdout, _err=sys.stderr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py 
b/tests/orchestrator/context/__init__.py
index 780db07..257cbf7 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -27,6 +27,6 @@ def execute(workflow_func, workflow_context, executor):
     graph = workflow_func(ctx=workflow_context)
 
     graph_compiler.GraphCompiler(workflow_context, 
executor.__class__).compile(graph)
-    eng = engine.Engine(executors={executor.__class__: executor})
+    eng = engine.Engine(executor)
 
     eng.execute(workflow_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py 
b/tests/orchestrator/context/test_serialize.py
index 091e23c..6e9c950 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -49,7 +49,7 @@ def test_serialize_operation_context(context, executor, 
tmpdir):
 
     graph = _mock_workflow(ctx=context)  # pylint: 
disable=no-value-for-parameter
     graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
-    eng = engine.Engine({executor.__class__: executor})
+    eng = engine.Engine(executor)
     eng.execute(context)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/tests/orchestrator/execution/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution/__init__.py 
b/tests/orchestrator/execution/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/orchestrator/execution/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/tests/orchestrator/execution/test_execution_compiler.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution/test_execution_compiler.py 
b/tests/orchestrator/execution/test_execution_compiler.py
new file mode 100644
index 0000000..42e5272
--- /dev/null
+++ b/tests/orchestrator/execution/test_execution_compiler.py
@@ -0,0 +1,628 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import time
+from threading import Thread, Event
+from datetime import datetime
+
+import pytest
+
+from aria.modeling import exceptions as modeling_exceptions
+from aria.modeling import models
+from aria.orchestrator import exceptions
+from aria.orchestrator import events
+from aria.orchestrator import execution_preparer
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor import thread
+from aria.orchestrator import (
+    workflow,
+    operation,
+)
+
+from tests import (
+    mock as tests_mock,
+    storage
+)
+
+from ...fixtures import (  # pylint: disable=unused-import
+    plugins_dir,
+    plugin_manager,
+    fs_model as model,
+    resource_storage as resource
+)
+
+custom_events = {
+    'is_resumed': Event(),
+    'is_active': Event(),
+    'execution_cancelled': Event(),
+    'execution_failed': Event(),
+}
+
+
+class TimeoutError(BaseException):
+    pass
+
+
+class FailingTask(BaseException):
+    pass
+
+
+def test_undeclared_workflow(request):
+    # validating a proper error is raised when the workflow is not declared in 
the service
+    with pytest.raises(exceptions.UndeclaredWorkflowError):
+        _get_preparer(request, 'undeclared_workflow').prepare()
+
+
+def test_missing_workflow_implementation(service, request):
+    # validating a proper error is raised when the workflow code path does not 
exist
+    workflow = models.Operation(
+        name='test_workflow',
+        service=service,
+        function='nonexistent.workflow.implementation')
+    service.workflows['test_workflow'] = workflow
+
+    with pytest.raises(exceptions.WorkflowImplementationNotFoundError):
+        _get_preparer(request, 'test_workflow').prepare()
+
+
+def test_builtin_workflow_instantiation(request):
+    # validates the workflow runner instantiates properly when provided with a 
builtin workflow
+    # (expecting no errors to be raised on undeclared workflow or missing 
workflow implementation)
+    workflow_ctx = _get_preparer(request, 'install').prepare()
+    assert len(workflow_ctx.execution.tasks) == 18  # expecting 18 tasks for 2 
node topology
+
+
+def test_custom_workflow_instantiation(request):
+    # validates the workflow runner instantiates properly when provided with a 
custom workflow
+    # (expecting no errors to be raised on undeclared workflow or missing 
workflow implementation)
+    mock_workflow = _setup_mock_workflow_in_service(request)
+    workflow_ctx = _get_preparer(request, mock_workflow).prepare()
+    assert len(workflow_ctx.execution.tasks) == 2   # mock workflow creates 
only start workflow
+                                                    # and end workflow task
+
+
+def test_existing_active_executions(request, service, model):
+    existing_active_execution = models.Execution(
+        service=service,
+        status=models.Execution.STARTED,
+        workflow_name='uninstall')
+    model.execution.put(existing_active_execution)
+    with pytest.raises(exceptions.ActiveExecutionsError):
+        _get_preparer(request, 'install').prepare()
+
+
+def test_existing_executions_but_no_active_ones(request, service, model):
+    existing_terminated_execution = models.Execution(
+        service=service,
+        status=models.Execution.SUCCEEDED,
+        workflow_name='uninstall')
+    model.execution.put(existing_terminated_execution)
+    # no active executions exist, so no error should be raised
+    _get_preparer(request, 'install').prepare()
+
+
+def test_execution_model_creation(request, service):
+    mock_workflow = _setup_mock_workflow_in_service(request)
+
+    workflow_ctx = _get_preparer(request, mock_workflow).prepare()
+
+    assert workflow_ctx.execution.service.id == service.id
+    assert workflow_ctx.execution.workflow_name == mock_workflow
+    assert workflow_ctx.execution.created_at <= datetime.utcnow()
+    assert workflow_ctx.execution.inputs == dict()
+
+
+def test_execution_inputs_override_workflow_inputs(request):
+    wf_inputs = {'input1': 'value1', 'input2': 'value2', 'input3': 5}
+    mock_workflow = _setup_mock_workflow_in_service(
+        request,
+        inputs=dict((name, models.Input.wrap(name, val)) for name, val
+                    in wf_inputs.iteritems()))
+
+    workflow_ctx = _get_preparer(request, mock_workflow).prepare(
+        execution_inputs={'input2': 'overriding-value2', 'input3': 7}
+    )
+
+    assert len(workflow_ctx.execution.inputs) == 3
+    # did not override input1 - expecting the default value from the workflow 
inputs
+    assert workflow_ctx.execution.inputs['input1'].value == 'value1'
+    # overrode input2
+    assert workflow_ctx.execution.inputs['input2'].value == 'overriding-value2'
+    # overrode input of integer type
+    assert workflow_ctx.execution.inputs['input3'].value == 7
+
+
+def test_execution_inputs_undeclared_inputs(request):
+    mock_workflow = _setup_mock_workflow_in_service(request)
+
+    with pytest.raises(modeling_exceptions.UndeclaredInputsException):
+        _get_preparer(request, mock_workflow).prepare(
+            execution_inputs={'undeclared_input': 'value'})
+
+
+def test_execution_inputs_missing_required_inputs(request):
+    mock_workflow = _setup_mock_workflow_in_service(
+        request, inputs={'required_input': models.Input.wrap('required_input', 
value=None)})
+
+    with pytest.raises(modeling_exceptions.MissingRequiredInputsException):
+        _get_preparer(request, mock_workflow).prepare(execution_inputs={})
+
+
+def test_execution_inputs_wrong_type_inputs(request):
+    mock_workflow = _setup_mock_workflow_in_service(
+        request, inputs={'input': models.Input.wrap('input', 'value')})
+
+    with pytest.raises(modeling_exceptions.ParametersOfWrongTypeException):
+        _get_preparer(request, 
mock_workflow).prepare(execution_inputs={'input': 5})
+
+
+def test_execution_inputs_builtin_workflow_with_inputs(request):
+    # built-in workflows don't have inputs
+    with pytest.raises(modeling_exceptions.UndeclaredInputsException):
+        _get_preparer(request, 
'install').prepare(execution_inputs={'undeclared_input': 'value'})
+
+
+def test_workflow_function_parameters(request, tmpdir):
+    # validating the workflow function is passed with the
+    # merged execution inputs, in dict form
+
+    # the workflow function parameters will be written to this file
+    output_path = str(tmpdir.join('output'))
+    wf_inputs = {'output_path': output_path, 'input1': 'value1', 'input2': 
'value2', 'input3': 5}
+
+    mock_workflow = _setup_mock_workflow_in_service(
+        request, inputs=dict((name, models.Input.wrap(name, val)) for name, val
+                             in wf_inputs.iteritems()))
+
+    _get_preparer(request, mock_workflow).prepare(
+        execution_inputs={'input2': 'overriding-value2', 'input3': 7})
+
+    with open(output_path) as f:
+        wf_call_kwargs = json.load(f)
+    assert len(wf_call_kwargs) == 3
+    assert wf_call_kwargs.get('input1') == 'value1'
+    assert wf_call_kwargs.get('input2') == 'overriding-value2'
+    assert wf_call_kwargs.get('input3') == 7
+
+
+@pytest.fixture
+def service(model):
+    # sets up a service in the storage
+    service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
+    service = model.service.get(service_id)
+    return service
+
+
+def _setup_mock_workflow_in_service(request, inputs=None):
+    # sets up a mock workflow as part of the service, including uploading
+    # the workflow code to the service's dir on the resource storage
+    service = request.getfixturevalue('service')
+    resource = request.getfixturevalue('resource')
+
+    source = tests_mock.workflow.__file__
+    resource.service_template.upload(str(service.service_template.id), source)
+    mock_workflow_name = 'test_workflow'
+    arguments = {}
+    if inputs:
+        for input in inputs.itervalues():
+            arguments[input.name] = input.as_argument()
+    workflow = models.Operation(
+        name=mock_workflow_name,
+        service=service,
+        function='workflow.mock_workflow',
+        inputs=inputs or {},
+        arguments=arguments)
+    service.workflows[mock_workflow_name] = workflow
+    return mock_workflow_name
+
+
+def _get_preparer(request, workflow_name):
+    # helper method for instantiating a workflow runner
+    service = request.getfixturevalue('service')
+    model = request.getfixturevalue('model')
+    resource = request.getfixturevalue('resource')
+    plugin_manager = request.getfixturevalue('plugin_manager')
+
+    return execution_preparer.ExecutionPreparer(
+        model,
+        resource,
+        plugin_manager,
+        service,
+        workflow_name
+    )
+
+
+class TestResumableWorkflows(object):
+
+    def _prepare_execution_and_get_workflow_ctx(
+            self,
+            model,
+            resource,
+            service,
+            workflow,
+            executor,
+            inputs=None):
+
+        service.workflows['custom_workflow'] = 
tests_mock.models.create_operation(
+            'custom_workflow',
+            operation_kwargs={
+                'function': '{0}.{1}'.format(__name__, workflow.__name__),
+                'inputs': dict((k, models.Input.wrap(k, v)) for k, v in 
(inputs or {}).items())
+            }
+        )
+        model.service.update(service)
+        compiler = execution_preparer.ExecutionPreparer(
+            model, resource, None, service, 'custom_workflow'
+        )
+        ctx = compiler.prepare(inputs, executor)
+        model.execution.update(ctx.execution)
+
+        return ctx
+
+    @staticmethod
+    def _cancel_active_execution(eng, ctx):
+        if custom_events['is_active'].wait(60) is False:
+            raise TimeoutError("is_active wasn't set to True")
+        eng.cancel_execution(ctx)
+        if custom_events['execution_cancelled'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+    def test_resume_workflow(self, workflow_context, thread_executor):
+        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
+        self._create_interface(workflow_context, node, 
mock_pass_first_task_only)
+        ctx = self._prepare_execution_and_get_workflow_ctx(
+            workflow_context.model,
+            workflow_context.resource,
+            workflow_context.model.service.list()[0],
+            mock_parallel_tasks_workflow,
+            thread_executor,
+            inputs={'number_of_tasks': 2}
+        )
+
+        eng = engine.Engine(thread_executor)
+
+        wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
+        wf_thread.daemon = True
+        wf_thread.start()
+
+        # Wait for the execution to start
+        self._cancel_active_execution(eng, ctx)
+        node = ctx.model.node.refresh(node)
+
+        tasks = ctx.model.task.list(filters={'_stub_type': None})
+        assert any(task.status == task.SUCCESS for task in tasks)
+        assert any(task.status == task.RETRYING for task in tasks)
+        custom_events['is_resumed'].set()
+        assert any(task.status == task.RETRYING for task in tasks)
+
+        # Create a new workflow engine, with an existing execution id. This 
would cause
+        # the old execution to restart.
+        new_engine = engine.Engine(thread_executor)
+        new_engine.execute(ctx, resuming=True)
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert all(task.status == task.SUCCESS for task in tasks)
+        assert node.attributes['invocations'].value == 3
+        assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+    def test_resume_started_task(self, workflow_context, thread_executor):
+        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
+        self._create_interface(workflow_context, node, mock_stuck_task)
+
+        ctx = self._prepare_execution_and_get_workflow_ctx(
+            workflow_context.model,
+            workflow_context.resource,
+            workflow_context.model.service.list()[0],
+            mock_parallel_tasks_workflow,
+            thread_executor,
+            inputs={'number_of_tasks': 1}
+        )
+
+        eng = engine.Engine(thread_executor)
+        wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
+        wf_thread.daemon = True
+        wf_thread.start()
+
+        self._cancel_active_execution(eng, ctx)
+        node = workflow_context.model.node.refresh(node)
+        task = workflow_context.model.task.list(filters={'_stub_type': 
None})[0]
+        assert node.attributes['invocations'].value == 1
+        assert task.status == task.STARTED
+        assert ctx.execution.status in (ctx.execution.CANCELLED, 
ctx.execution.CANCELLING)
+        custom_events['is_resumed'].set()
+
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_engine = engine.Engine(new_thread_executor)
+            new_engine.execute(ctx, resuming=True)
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 2
+        assert task.status == task.SUCCESS
+        assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+    def test_resume_failed_task(self, workflow_context, thread_executor):
+        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
+        self._create_interface(workflow_context, node, 
mock_failed_before_resuming)
+
+        ctx = self._prepare_execution_and_get_workflow_ctx(
+            workflow_context.model,
+            workflow_context.resource,
+            workflow_context.model.service.list()[0],
+            mock_parallel_tasks_workflow,
+            thread_executor)
+
+        eng = engine.Engine(thread_executor)
+        wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
+        wf_thread.setDaemon(True)
+        wf_thread.start()
+
+        self._cancel_active_execution(eng, ctx)
+        node = workflow_context.model.node.refresh(node)
+
+        task = workflow_context.model.task.list(filters={'_stub_type': 
None})[0]
+        assert node.attributes['invocations'].value == 2
+        assert task.status == task.STARTED
+        assert ctx.execution.status in (ctx.execution.CANCELLED, 
ctx.execution.CANCELLING)
+
+        custom_events['is_resumed'].set()
+        assert node.attributes['invocations'].value == 2
+
+        # Create a new workflow runner, with an existing execution id. This 
would cause
+        # the old execution to restart.
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_engine = engine.Engine(new_thread_executor)
+            new_engine.execute(ctx, resuming=True)
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == task.max_attempts - 1
+        assert task.status == task.SUCCESS
+        assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+    def test_resume_failed_task_and_successful_task(self, workflow_context, 
thread_executor):
+        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
+        self._create_interface(workflow_context, node, 
mock_pass_first_task_only)
+
+        ctx = self._prepare_execution_and_get_workflow_ctx(
+            workflow_context.model,
+            workflow_context.resource,
+            workflow_context.model.service.list()[0],
+            mock_parallel_tasks_workflow,
+            thread_executor,
+            inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 
2}
+        )
+        eng = engine.Engine(thread_executor)
+        wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
+        wf_thread.setDaemon(True)
+        wf_thread.start()
+
+        if custom_events['execution_failed'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 3
+        failed_task = [t for t in tasks if t.status == t.FAILED][0]
+
+        # First task passes
+        assert any(task.status == task.FAILED for task in tasks)
+        assert failed_task.attempts_count == 2
+        # Second task fails
+        assert any(task.status == task.SUCCESS for task in tasks)
+        assert ctx.execution.status in ctx.execution.FAILED
+
+        custom_events['is_resumed'].set()
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_engine = engine.Engine(new_thread_executor)
+            new_engine.execute(ctx, resuming=True, retry_failed=True)
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert failed_task.attempts_count == 1
+        assert node.attributes['invocations'].value == 4
+        assert all(task.status == task.SUCCESS for task in tasks)
+        assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+    def test_two_sequential_task_first_task_failed(self, workflow_context, 
thread_executor):
+        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
+        self._create_interface(workflow_context, node, 
mock_fail_first_task_only)
+
+        ctx = self._prepare_execution_and_get_workflow_ctx(
+            workflow_context.model,
+            workflow_context.resource,
+            workflow_context.model.service.list()[0],
+            mock_sequential_tasks_workflow,
+            thread_executor,
+            inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 
2}
+        )
+        eng = engine.Engine(thread_executor)
+        wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
+        wf_thread.setDaemon(True)
+        wf_thread.start()
+
+        if custom_events['execution_failed'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 1
+        assert any(t.status == t.FAILED for t in tasks)
+        assert any(t.status == t.PENDING for t in tasks)
+
+        custom_events['is_resumed'].set()
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_engine = engine.Engine(new_thread_executor)
+            new_engine.execute(ctx, resuming=True)
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 2
+        assert any(t.status == t.SUCCESS for t in tasks)
+        assert any(t.status == t.FAILED for t in tasks)
+        assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+    @staticmethod
+    @pytest.fixture
+    def thread_executor():
+        result = thread.ThreadExecutor()
+        try:
+            yield result
+        finally:
+            result.close()
+
+    @staticmethod
+    @pytest.fixture
+    def workflow_context(tmpdir):
+        workflow_context = tests_mock.context.simple(str(tmpdir))
+        yield workflow_context
+        storage.release_sqlite_storage(workflow_context.model)
+
+    @staticmethod
+    def _create_interface(ctx, node, func, arguments=None):
+        interface_name = 'aria.interfaces.lifecycle'
+        operation_kwargs = dict(function='{name}.{func.__name__}'.format(
+            name=__name__, func=func))
+        if arguments:
+            # the operation has to declare the arguments before those may be 
passed
+            operation_kwargs['arguments'] = arguments
+        operation_name = 'create'
+        interface = tests_mock.models.create_interface(node.service, 
interface_name, operation_name,
+                                                       
operation_kwargs=operation_kwargs)
+        node.interfaces[interface.name] = interface
+        ctx.model.node.update(node)
+
+        return node, interface_name, operation_name
+
+    @staticmethod
+    def _engine(workflow_func, workflow_context, executor):
+        graph = workflow_func(ctx=workflow_context)
+        execution = workflow_context.execution
+        graph_compiler.GraphCompiler(workflow_context, 
executor.__class__).compile(graph)
+        workflow_context.execution = execution
+
+        return engine.Engine(executor)
+
+    @pytest.fixture(autouse=True)
+    def register_to_events(self):
+        def execution_cancelled(*args, **kwargs):
+            custom_events['execution_cancelled'].set()
+
+        def execution_failed(*args, **kwargs):
+            custom_events['execution_failed'].set()
+
+        events.on_cancelled_workflow_signal.connect(execution_cancelled)
+        events.on_failure_workflow_signal.connect(execution_failed)
+        yield
+        events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
+        events.on_failure_workflow_signal.disconnect(execution_failed)
+        for event in custom_events.values():
+            event.clear()
+
+
+@workflow
+def mock_sequential_tasks_workflow(ctx, graph,
+                                   retry_interval=1, max_attempts=10, 
number_of_tasks=1):
+    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+    graph.sequence(*_create_tasks(node, retry_interval, max_attempts, 
number_of_tasks))
+
+
+@workflow
+def mock_parallel_tasks_workflow(ctx, graph,
+                                 retry_interval=1, max_attempts=10, 
number_of_tasks=1):
+    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+    graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, 
number_of_tasks))
+
+
+def _create_tasks(node, retry_interval, max_attempts, number_of_tasks):
+    return [
+        api.task.OperationTask(node,
+                               'aria.interfaces.lifecycle',
+                               'create',
+                               retry_interval=retry_interval,
+                               max_attempts=max_attempts)
+        for _ in xrange(number_of_tasks)
+    ]
+
+
+@operation
+def mock_failed_before_resuming(ctx):
+    """
+    The task should run atmost ctx.task.max_attempts - 1 times, and only then 
pass.
+    overall, the number of invocations should be ctx.task.max_attempts - 1
+    """
+    ctx.node.attributes['invocations'] += 1
+
+    if ctx.node.attributes['invocations'] == 2:
+        custom_events['is_active'].set()
+        # unfreeze the thread only when all of the invocations are done
+        while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
+            time.sleep(5)
+
+    elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
+        # pass only just before the end.
+        return
+    else:
+        # fail o.w.
+        raise FailingTask("stop this task")
+
+
+@operation
+def mock_stuck_task(ctx):
+    ctx.node.attributes['invocations'] += 1
+    while not custom_events['is_resumed'].isSet():
+        if not custom_events['is_active'].isSet():
+            custom_events['is_active'].set()
+        time.sleep(5)
+
+
+@operation
+def mock_pass_first_task_only(ctx):
+    ctx.node.attributes['invocations'] += 1
+
+    if ctx.node.attributes['invocations'] != 1:
+        custom_events['is_active'].set()
+        if not custom_events['is_resumed'].isSet():
+            # if resume was called, increase by one. o/w fail the execution - 
second task should
+            # fail as long it was not a part of resuming the workflow
+            raise FailingTask("wasn't resumed yet")
+
+
+@operation
+def mock_fail_first_task_only(ctx):
+    ctx.node.attributes['invocations'] += 1
+
+    if not custom_events['is_resumed'].isSet() and 
ctx.node.attributes['invocations'] == 1:
+        raise FailingTask("First task should fail")

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py 
b/tests/orchestrator/execution_plugin/test_local.py
index 7f33318..5af6a2f 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -501,7 +501,7 @@ if __name__ == '__main__':
             return graph
         tasks_graph = mock_workflow(ctx=workflow_context)  # pylint: 
disable=no-value-for-parameter
         graph_compiler.GraphCompiler(workflow_context, 
executor.__class__).compile(tasks_graph)
-        eng = engine.Engine({executor.__class__: executor})
+        eng = engine.Engine(executor)
         eng.execute(workflow_context)
         return workflow_context.model.node.get_by_name(
             mock.models.DEPENDENCY_NODE_NAME).attributes

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33534111/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py 
b/tests/orchestrator/execution_plugin/test_ssh.py
index b5df939..e39f3ba 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -263,7 +263,7 @@ class TestWithActualSSHServer(object):
         tasks_graph = mock_workflow(ctx=self._workflow_context)  # pylint: 
disable=no-value-for-parameter
         graph_compiler.GraphCompiler(
             self._workflow_context, 
self._executor.__class__).compile(tasks_graph)
-        eng = engine.Engine({self._executor.__class__: self._executor})
+        eng = engine.Engine(self._executor)
         eng.execute(self._workflow_context)
         return self._workflow_context.model.node.get_by_name(
             mock.models.DEPENDENCY_NODE_NAME).attributes

Reply via email to