Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-14-workflow-engine-tests 5a77c4716 -> ff5098ca8
clean and add a test Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ff5098ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ff5098ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ff5098ca Branch: refs/heads/ARIA-14-workflow-engine-tests Commit: ff5098ca8e09f4a5a72d7b160d0c08902c282849 Parents: 5a77c47 Author: Dan Kilman <dankil...@gmail.com> Authored: Wed Nov 2 17:25:31 2016 +0200 Committer: Dan Kilman <dankil...@gmail.com> Committed: Wed Nov 2 17:25:31 2016 +0200 ---------------------------------------------------------------------- aria/workflows/core/engine.py | 3 +- tests/workflows/test_engine.py | 251 ++++++++++++++++++++---------------- 2 files changed, 138 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ff5098ca/aria/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py index 5853c61..7a2431c 100644 --- a/aria/workflows/core/engine.py +++ b/aria/workflows/core/engine.py @@ -36,10 +36,9 @@ class Engine(logger.LoggerMixin): def __init__(self, executor, workflow_context, tasks_graph, **kwargs): super(Engine, self).__init__(**kwargs) self._workflow_context = workflow_context - self._tasks_graph = tasks_graph self._execution_graph = networkx.DiGraph() self._executor = executor - translation.build_execution_graph(task_graph=self._tasks_graph, + translation.build_execution_graph(task_graph=tasks_graph, workflow_context=workflow_context, execution_graph=self._execution_graph) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ff5098ca/tests/workflows/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py index 6b37e7f..2488264 100644 --- a/tests/workflows/test_engine.py +++ b/tests/workflows/test_engine.py @@ -29,98 +29,164 @@ from aria.workflows.core import engine logging.basicConfig() -class TestEngine(object): +global_test_holder = {} + - def test_empty_graph_execution(self, workflow_context): - self.executor = thread.ThreadExecutor() +class TestEngine(object): + def test_empty_graph_execution(self, workflow_context, executor): @workflow def mock_workflow(context, graph): - return graph - - graph = mock_workflow(context=workflow_context) - eng = engine.Engine(executor=self.executor, - workflow_context=workflow_context, - tasks_graph=graph) - eng.execute() + pass + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) assert workflow_context.states == ['start', 'success'] - def test_single_task_successful_execution(self, workflow_context): - self.executor = thread.ThreadExecutor() - + def test_single_task_successful_execution(self, workflow_context, executor): @workflow def mock_workflow(context, graph): - graph.add_task(context.operation( - name='failing', - operation_details={'operation': 'tests.workflows.test_engine.{name}'.format( - name=mock_success_task.__name__)}, - node_instance=None - )) - return graph - - graph = mock_workflow(context=workflow_context) - eng = engine.Engine(executor=self.executor, - workflow_context=workflow_context, - tasks_graph=graph) - execution_tasks = [data['task'] for _, data in eng._execution_graph.nodes_iter(data=True)] - eng.execute() + graph.add_task(_op(mock_success_task, context)) + execution_tasks = self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) assert workflow_context.states == ['start', 'success'] assert any(hasattr(t, 'sent') and t.sent is True for t in execution_tasks) - def test_single_task_failed_execution(self, workflow_context): - self.executor = thread.ThreadExecutor() - + def test_single_task_failed_execution(self, workflow_context, executor): @workflow def mock_workflow(context, graph): - graph.add_task(context.operation( - name='failing', - operation_details={'operation': 'tests.workflows.test_engine.{name}'.format( - name=mock_failed_task.__name__)}, - node_instance=None - )) - return graph - - graph = mock_workflow(context=workflow_context) - eng = engine.Engine(executor=self.executor, - workflow_context=workflow_context, - tasks_graph=graph) - execution_tasks = [data['task'] for _, data in eng._execution_graph.nodes_iter(data=True)] + graph.add_task(_op(mock_failed_task, context)) with pytest.raises(RuntimeError): - eng.execute() + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) assert workflow_context.states == ['start', 'failure'] assert isinstance(workflow_context.exception, RuntimeError) - assert any(hasattr(t, 'sent') and t.sent is True for t in execution_tasks) - - def setup_method(self): - events.start_workflow_signal.connect(start_workflow_handler) - events.on_success_workflow_signal.connect(success_workflow_handler) - events.on_failure_workflow_signal.connect(failure_workflow_handler) - events.sent_task_signal.connect(sent_task_handler) - - def teardown_method(self): - events.start_workflow_signal.disconnect(start_workflow_handler) - events.on_success_workflow_signal.disconnect(success_workflow_handler) - events.on_failure_workflow_signal.disconnect(failure_workflow_handler) - events.sent_task_signal.disconnect(sent_task_handler) - if self.executor: - self.executor.close() + def test_two_tasks_execution_order(self, workflow_context, executor): + @workflow + def mock_workflow(context, graph): + op1 = _op(mock_ordered_task, context, inputs={'counter': 1}) + op2 = _op(mock_ordered_task, context, inputs={'counter': 2}) + graph.chain(tasks=[op1, op2]) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert global_test_holder.get('invocations') == [1, 2] -def sent_task_handler(task, *args, **kwargs): - task.sent = True + @staticmethod + def _execute(workflow_func, workflow_context, executor): + graph = workflow_func(context=workflow_context) + eng = engine.Engine(executor=executor, + workflow_context=workflow_context, + tasks_graph=graph) + execution_tasks = [data['task'] for _, data in eng._execution_graph.nodes_iter(data=True)] + eng.execute() + return execution_tasks + @pytest.fixture(scope='function', autouse=True) + def globals_cleanup(self): + try: + yield + finally: + global_test_holder.clear() -def start_workflow_handler(workflow_context, *args, **kwargs): - workflow_context.states.append('start') + @pytest.fixture(scope='function', autouse=True) + def signals_registration(self, ): + def sent_task_handler(task, *args, **kwargs): + task.sent = True + def start_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('start') -def success_workflow_handler(workflow_context, *args, **kwargs): - workflow_context.states.append('success') + def success_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('success') + def failure_workflow_handler(workflow_context, exception, *args, **kwargs): + workflow_context.states.append('failure') + workflow_context.exception = exception -def failure_workflow_handler(workflow_context, exception, *args, **kwargs): - workflow_context.states.append('failure') - workflow_context.exception = exception + events.start_workflow_signal.connect(start_workflow_handler) + events.on_success_workflow_signal.connect(success_workflow_handler) + events.on_failure_workflow_signal.connect(failure_workflow_handler) + events.sent_task_signal.connect(sent_task_handler) + try: + yield + finally: + events.start_workflow_signal.disconnect(start_workflow_handler) + events.on_success_workflow_signal.disconnect(success_workflow_handler) + events.on_failure_workflow_signal.disconnect(failure_workflow_handler) + events.sent_task_signal.disconnect(sent_task_handler) + + @pytest.fixture(scope='function') + def executor(self): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + @pytest.fixture(scope='function') + def workflow_context(self, tmpdir): + from dsl_parser.parser import parse_from_path + from dsl_parser.tasks import prepare_deployment_plan + blueprint = 'tosca_definitions_version: cloudify_dsl_1_3\nnode_templates: {}' + blueprint_dir = tmpdir.mkdir('blueprint') + blueprint_path = blueprint_dir.join('blueprint.yaml') + blueprint_path.write(blueprint) + blueprint_plan = parse_from_path(str(blueprint_path)) + blueprint_id = 'b1' + deployment_plan = prepare_deployment_plan(blueprint_plan.copy()) + deployment_id = 'd1' + work_dir = tmpdir.mkdir('work') + storage_dir = work_dir.mkdir('storage') + model_storage_dir = storage_dir.mkdir('model') + resource_storage_dir = storage_dir.mkdir('resource') + model_storage = aria.application_model_storage( + drivers.FileSystemModelDriver(str(model_storage_dir))) + resource_storage = aria.application_resource_storage( + drivers.FileSystemResourceDriver(str(resource_storage_dir))) + resource_storage.setup() + model_storage.setup() + storage_manager = application.StorageManager( + model_storage=model_storage, + resource_storage=resource_storage, + blueprint_path=blueprint_path, + blueprint_id=blueprint_id, + blueprint_plan=blueprint_plan, + deployment_id=deployment_id, + deployment_plan=deployment_plan + ) + storage_manager.create_blueprint_storage( + source=str(blueprint_path), + main_file_name='blueprint.yaml') + storage_manager.create_nodes_storage() + storage_manager.create_deployment_storage() + storage_manager.create_node_instances_storage() + result = contexts.WorkflowContext( + name='test', + model_storage=model_storage, + resource_storage=resource_storage, + deployment_id=deployment_id, + workflow_id='name') + result.states = [] + result.exception = None + return result + + +def _op(function, context, inputs=None): + return context.operation( + name='task', + node_instance=None, + operation_details={'operation': 'tests.workflows.test_engine.{name}'.format( + name=function.__name__)}, + inputs=inputs + ) def mock_success_task(): @@ -131,49 +197,6 @@ def mock_failed_task(): raise RuntimeError -@pytest.fixture(scope='function') -def workflow_context(tmpdir): - from dsl_parser.parser import parse_from_path - from dsl_parser.tasks import prepare_deployment_plan - blueprint = 'tosca_definitions_version: cloudify_dsl_1_3\nnode_templates: {}' - blueprint_dir = tmpdir.mkdir('blueprint') - blueprint_path = blueprint_dir.join('blueprint.yaml') - blueprint_path.write(blueprint) - blueprint_plan = parse_from_path(str(blueprint_path)) - blueprint_id = 'b1' - deployment_plan = prepare_deployment_plan(blueprint_plan.copy()) - deployment_id = 'd1' - work_dir = tmpdir.mkdir('work') - storage_dir = work_dir.mkdir('storage') - model_storage_dir = storage_dir.mkdir('model') - resource_storage_dir = storage_dir.mkdir('resource') - model_storage = aria.application_model_storage( - drivers.FileSystemModelDriver(str(model_storage_dir))) - resource_storage = aria.application_resource_storage( - drivers.FileSystemResourceDriver(str(resource_storage_dir))) - resource_storage.setup() - model_storage.setup() - storage_manager = application.StorageManager( - model_storage=model_storage, - resource_storage=resource_storage, - blueprint_path=blueprint_path, - blueprint_id=blueprint_id, - blueprint_plan=blueprint_plan, - deployment_id=deployment_id, - deployment_plan=deployment_plan - ) - storage_manager.create_blueprint_storage( - source=str(blueprint_path), - main_file_name='blueprint.yaml') - storage_manager.create_nodes_storage() - storage_manager.create_deployment_storage() - storage_manager.create_node_instances_storage() - result = contexts.WorkflowContext( - name='test', - model_storage=model_storage, - resource_storage=resource_storage, - deployment_id=deployment_id, - workflow_id='name') - result.states = [] - result.exception = None - return result +def mock_ordered_task(counter): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(counter)