Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-14-workflow-engine-tests 5a77c4716 -> ff5098ca8


clean and add a test


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ff5098ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ff5098ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ff5098ca

Branch: refs/heads/ARIA-14-workflow-engine-tests
Commit: ff5098ca8e09f4a5a72d7b160d0c08902c282849
Parents: 5a77c47
Author: Dan Kilman <dankil...@gmail.com>
Authored: Wed Nov 2 17:25:31 2016 +0200
Committer: Dan Kilman <dankil...@gmail.com>
Committed: Wed Nov 2 17:25:31 2016 +0200

----------------------------------------------------------------------
 aria/workflows/core/engine.py  |   3 +-
 tests/workflows/test_engine.py | 251 ++++++++++++++++++++----------------
 2 files changed, 138 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ff5098ca/aria/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py
index 5853c61..7a2431c 100644
--- a/aria/workflows/core/engine.py
+++ b/aria/workflows/core/engine.py
@@ -36,10 +36,9 @@ class Engine(logger.LoggerMixin):
     def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
         super(Engine, self).__init__(**kwargs)
         self._workflow_context = workflow_context
-        self._tasks_graph = tasks_graph
         self._execution_graph = networkx.DiGraph()
         self._executor = executor
-        translation.build_execution_graph(task_graph=self._tasks_graph,
+        translation.build_execution_graph(task_graph=tasks_graph,
                                           workflow_context=workflow_context,
                                           
execution_graph=self._execution_graph)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ff5098ca/tests/workflows/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py
index 6b37e7f..2488264 100644
--- a/tests/workflows/test_engine.py
+++ b/tests/workflows/test_engine.py
@@ -29,98 +29,164 @@ from aria.workflows.core import engine
 logging.basicConfig()
 
 
-class TestEngine(object):
+global_test_holder = {}
+
 
-    def test_empty_graph_execution(self, workflow_context):
-        self.executor = thread.ThreadExecutor()
+class TestEngine(object):
 
+    def test_empty_graph_execution(self, workflow_context, executor):
         @workflow
         def mock_workflow(context, graph):
-            return graph
-
-        graph = mock_workflow(context=workflow_context)
-        eng = engine.Engine(executor=self.executor,
-                            workflow_context=workflow_context,
-                            tasks_graph=graph)
-        eng.execute()
+            pass
+        self._execute(workflow_func=mock_workflow,
+                      workflow_context=workflow_context,
+                      executor=executor)
         assert workflow_context.states == ['start', 'success']
 
-    def test_single_task_successful_execution(self, workflow_context):
-        self.executor = thread.ThreadExecutor()
-
+    def test_single_task_successful_execution(self, workflow_context, 
executor):
         @workflow
         def mock_workflow(context, graph):
-            graph.add_task(context.operation(
-                name='failing',
-                operation_details={'operation': 
'tests.workflows.test_engine.{name}'.format(
-                    name=mock_success_task.__name__)},
-                node_instance=None
-            ))
-            return graph
-
-        graph = mock_workflow(context=workflow_context)
-        eng = engine.Engine(executor=self.executor,
-                            workflow_context=workflow_context,
-                            tasks_graph=graph)
-        execution_tasks = [data['task'] for _, data in 
eng._execution_graph.nodes_iter(data=True)]
-        eng.execute()
+            graph.add_task(_op(mock_success_task, context))
+        execution_tasks = self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
         assert workflow_context.states == ['start', 'success']
         assert any(hasattr(t, 'sent') and t.sent is True for t in 
execution_tasks)
 
-    def test_single_task_failed_execution(self, workflow_context):
-        self.executor = thread.ThreadExecutor()
-
+    def test_single_task_failed_execution(self, workflow_context, executor):
         @workflow
         def mock_workflow(context, graph):
-            graph.add_task(context.operation(
-                name='failing',
-                operation_details={'operation': 
'tests.workflows.test_engine.{name}'.format(
-                    name=mock_failed_task.__name__)},
-                node_instance=None
-            ))
-            return graph
-
-        graph = mock_workflow(context=workflow_context)
-        eng = engine.Engine(executor=self.executor,
-                            workflow_context=workflow_context,
-                            tasks_graph=graph)
-        execution_tasks = [data['task'] for _, data in 
eng._execution_graph.nodes_iter(data=True)]
+            graph.add_task(_op(mock_failed_task, context))
         with pytest.raises(RuntimeError):
-            eng.execute()
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
         assert workflow_context.states == ['start', 'failure']
         assert isinstance(workflow_context.exception, RuntimeError)
-        assert any(hasattr(t, 'sent') and t.sent is True for t in 
execution_tasks)
-
-    def setup_method(self):
-        events.start_workflow_signal.connect(start_workflow_handler)
-        events.on_success_workflow_signal.connect(success_workflow_handler)
-        events.on_failure_workflow_signal.connect(failure_workflow_handler)
-        events.sent_task_signal.connect(sent_task_handler)
-
-    def teardown_method(self):
-        events.start_workflow_signal.disconnect(start_workflow_handler)
-        events.on_success_workflow_signal.disconnect(success_workflow_handler)
-        events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
-        events.sent_task_signal.disconnect(sent_task_handler)
-        if self.executor:
-            self.executor.close()
 
+    def test_two_tasks_execution_order(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(context, graph):
+            op1 = _op(mock_ordered_task, context, inputs={'counter': 1})
+            op2 = _op(mock_ordered_task, context, inputs={'counter': 2})
+            graph.chain(tasks=[op1, op2])
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert global_test_holder.get('invocations') == [1, 2]
 
-def sent_task_handler(task, *args, **kwargs):
-    task.sent = True
+    @staticmethod
+    def _execute(workflow_func, workflow_context, executor):
+        graph = workflow_func(context=workflow_context)
+        eng = engine.Engine(executor=executor,
+                            workflow_context=workflow_context,
+                            tasks_graph=graph)
+        execution_tasks = [data['task'] for _, data in 
eng._execution_graph.nodes_iter(data=True)]
+        eng.execute()
+        return execution_tasks
 
+    @pytest.fixture(scope='function', autouse=True)
+    def globals_cleanup(self):
+        try:
+            yield
+        finally:
+            global_test_holder.clear()
 
-def start_workflow_handler(workflow_context, *args, **kwargs):
-    workflow_context.states.append('start')
+    @pytest.fixture(scope='function', autouse=True)
+    def signals_registration(self, ):
+        def sent_task_handler(task, *args, **kwargs):
+            task.sent = True
 
+        def start_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('start')
 
-def success_workflow_handler(workflow_context, *args, **kwargs):
-    workflow_context.states.append('success')
+        def success_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('success')
 
+        def failure_workflow_handler(workflow_context, exception, *args, 
**kwargs):
+            workflow_context.states.append('failure')
+            workflow_context.exception = exception
 
-def failure_workflow_handler(workflow_context, exception, *args, **kwargs):
-    workflow_context.states.append('failure')
-    workflow_context.exception = exception
+        events.start_workflow_signal.connect(start_workflow_handler)
+        events.on_success_workflow_signal.connect(success_workflow_handler)
+        events.on_failure_workflow_signal.connect(failure_workflow_handler)
+        events.sent_task_signal.connect(sent_task_handler)
+        try:
+            yield
+        finally:
+            events.start_workflow_signal.disconnect(start_workflow_handler)
+            
events.on_success_workflow_signal.disconnect(success_workflow_handler)
+            
events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
+            events.sent_task_signal.disconnect(sent_task_handler)
+
+    @pytest.fixture(scope='function')
+    def executor(self):
+        result = thread.ThreadExecutor()
+        try:
+            yield result
+        finally:
+            result.close()
+
+    @pytest.fixture(scope='function')
+    def workflow_context(self, tmpdir):
+        from dsl_parser.parser import parse_from_path
+        from dsl_parser.tasks import prepare_deployment_plan
+        blueprint = 'tosca_definitions_version: 
cloudify_dsl_1_3\nnode_templates: {}'
+        blueprint_dir = tmpdir.mkdir('blueprint')
+        blueprint_path = blueprint_dir.join('blueprint.yaml')
+        blueprint_path.write(blueprint)
+        blueprint_plan = parse_from_path(str(blueprint_path))
+        blueprint_id = 'b1'
+        deployment_plan = prepare_deployment_plan(blueprint_plan.copy())
+        deployment_id = 'd1'
+        work_dir = tmpdir.mkdir('work')
+        storage_dir = work_dir.mkdir('storage')
+        model_storage_dir = storage_dir.mkdir('model')
+        resource_storage_dir = storage_dir.mkdir('resource')
+        model_storage = aria.application_model_storage(
+            drivers.FileSystemModelDriver(str(model_storage_dir)))
+        resource_storage = aria.application_resource_storage(
+            drivers.FileSystemResourceDriver(str(resource_storage_dir)))
+        resource_storage.setup()
+        model_storage.setup()
+        storage_manager = application.StorageManager(
+            model_storage=model_storage,
+            resource_storage=resource_storage,
+            blueprint_path=blueprint_path,
+            blueprint_id=blueprint_id,
+            blueprint_plan=blueprint_plan,
+            deployment_id=deployment_id,
+            deployment_plan=deployment_plan
+        )
+        storage_manager.create_blueprint_storage(
+            source=str(blueprint_path),
+            main_file_name='blueprint.yaml')
+        storage_manager.create_nodes_storage()
+        storage_manager.create_deployment_storage()
+        storage_manager.create_node_instances_storage()
+        result = contexts.WorkflowContext(
+            name='test',
+            model_storage=model_storage,
+            resource_storage=resource_storage,
+            deployment_id=deployment_id,
+            workflow_id='name')
+        result.states = []
+        result.exception = None
+        return result
+
+
+def _op(function, context, inputs=None):
+    return context.operation(
+        name='task',
+        node_instance=None,
+        operation_details={'operation': 
'tests.workflows.test_engine.{name}'.format(
+            name=function.__name__)},
+        inputs=inputs
+    )
 
 
 def mock_success_task():
@@ -131,49 +197,6 @@ def mock_failed_task():
     raise RuntimeError
 
 
-@pytest.fixture(scope='function')
-def workflow_context(tmpdir):
-    from dsl_parser.parser import parse_from_path
-    from dsl_parser.tasks import prepare_deployment_plan
-    blueprint = 'tosca_definitions_version: cloudify_dsl_1_3\nnode_templates: 
{}'
-    blueprint_dir = tmpdir.mkdir('blueprint')
-    blueprint_path = blueprint_dir.join('blueprint.yaml')
-    blueprint_path.write(blueprint)
-    blueprint_plan = parse_from_path(str(blueprint_path))
-    blueprint_id = 'b1'
-    deployment_plan = prepare_deployment_plan(blueprint_plan.copy())
-    deployment_id = 'd1'
-    work_dir = tmpdir.mkdir('work')
-    storage_dir = work_dir.mkdir('storage')
-    model_storage_dir = storage_dir.mkdir('model')
-    resource_storage_dir = storage_dir.mkdir('resource')
-    model_storage = aria.application_model_storage(
-        drivers.FileSystemModelDriver(str(model_storage_dir)))
-    resource_storage = aria.application_resource_storage(
-        drivers.FileSystemResourceDriver(str(resource_storage_dir)))
-    resource_storage.setup()
-    model_storage.setup()
-    storage_manager = application.StorageManager(
-        model_storage=model_storage,
-        resource_storage=resource_storage,
-        blueprint_path=blueprint_path,
-        blueprint_id=blueprint_id,
-        blueprint_plan=blueprint_plan,
-        deployment_id=deployment_id,
-        deployment_plan=deployment_plan
-    )
-    storage_manager.create_blueprint_storage(
-        source=str(blueprint_path),
-        main_file_name='blueprint.yaml')
-    storage_manager.create_nodes_storage()
-    storage_manager.create_deployment_storage()
-    storage_manager.create_node_instances_storage()
-    result = contexts.WorkflowContext(
-        name='test',
-        model_storage=model_storage,
-        resource_storage=resource_storage,
-        deployment_id=deployment_id,
-        workflow_id='name')
-    result.states = []
-    result.exception = None
-    return result
+def mock_ordered_task(counter):
+    invocations = global_test_holder.setdefault('invocations', [])
+    invocations.append(counter)

Reply via email to