Github user aviyoop commented on a diff in the pull request: https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152283978 --- Diff: tests/orchestrator/execution/test_execution_compiler.py --- @@ -296,171 +230,161 @@ def _setup_mock_workflow_in_service(request, inputs=None): return mock_workflow_name -def _create_workflow_runner(request, workflow_name, inputs=None, executor=None, - task_max_attempts=None, task_retry_interval=None): +def _get_compiler(request, workflow_name): # helper method for instantiating a workflow runner - service_id = request.getfixturevalue('service').id + service = request.getfixturevalue('service') model = request.getfixturevalue('model') resource = request.getfixturevalue('resource') plugin_manager = request.getfixturevalue('plugin_manager') - # task configuration parameters can't be set to None, therefore only - # passing those if they've been set by the test - task_configuration_kwargs = dict() - if task_max_attempts is not None: - task_configuration_kwargs['task_max_attempts'] = task_max_attempts - if task_retry_interval is not None: - task_configuration_kwargs['task_retry_interval'] = task_retry_interval - - return WorkflowRunner( - workflow_name=workflow_name, - service_id=service_id, - inputs=inputs or {}, - executor=executor, - model_storage=model, - resource_storage=resource, - plugin_manager=plugin_manager, - **task_configuration_kwargs) + return execution_compiler.ExecutionCompiler( + model, + resource, + plugin_manager, + service, + workflow_name + ) class TestResumableWorkflows(object): - def _create_initial_workflow_runner( - self, workflow_context, workflow, executor, inputs=None): + def _compile_execution( + self, + model, + resource, + service, + workflow, + executor, + inputs=None): - service = workflow_context.service service.workflows['custom_workflow'] = tests_mock.models.create_operation( 'custom_workflow', operation_kwargs={ 'function': '{0}.{1}'.format(__name__, workflow.__name__), 'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items()) } ) - workflow_context.model.service.update(service) - - wf_runner = WorkflowRunner( - service_id=workflow_context.service.id, - inputs=inputs or {}, - model_storage=workflow_context.model, - resource_storage=workflow_context.resource, - plugin_manager=None, - workflow_name='custom_workflow', - executor=executor) - return wf_runner + model.service.update(service) + compiler = execution_compiler.ExecutionCompiler( + model, resource, None, service, 'custom_workflow' + ) + ctx = compiler.compile(inputs, executor) + model.execution.update(ctx.execution) + + return ctx @staticmethod - def _wait_for_active_and_cancel(workflow_runner): + def _wait_for_active_and_cancel(eng, ctx): if custom_events['is_active'].wait(60) is False: raise TimeoutError("is_active wasn't set to True") - workflow_runner.cancel() + eng.cancel_execution(ctx) if custom_events['execution_cancelled'].wait(60) is False: raise TimeoutError("Execution did not end") def test_resume_workflow(self, workflow_context, thread_executor): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_pass_first_task_only) + ctx = self._compile_execution( + workflow_context.model, + workflow_context.resource, + workflow_context.model.service.list()[0], + mock_parallel_tasks_workflow, + thread_executor, + inputs={'number_of_tasks': 2} + ) - wf_runner = self._create_initial_workflow_runner( - workflow_context, mock_parallel_tasks_workflow, thread_executor, - inputs={'number_of_tasks': 2}) + eng = engine.Engine(thread_executor) - wf_thread = Thread(target=wf_runner.execute) + wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx)) wf_thread.daemon = True wf_thread.start() # Wait for the execution to start - self._wait_for_active_and_cancel(wf_runner) - node = workflow_context.model.node.refresh(node) + self._wait_for_active_and_cancel(eng, ctx) + node = ctx.model.node.refresh(node) - tasks = workflow_context.model.task.list(filters={'_stub_type': None}) + tasks = ctx.model.task.list(filters={'_stub_type': None}) assert any(task.status == task.SUCCESS for task in tasks) assert any(task.status == task.RETRYING for task in tasks) custom_events['is_resumed'].set() assert any(task.status == task.RETRYING for task in tasks) # Create a new workflow runner, with an existing execution id. This would cause --- End diff -- This comment needs to be revised
---