Github user aviyoop commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152283978
  
    --- Diff: tests/orchestrator/execution/test_execution_compiler.py ---
    @@ -296,171 +230,161 @@ def _setup_mock_workflow_in_service(request, 
inputs=None):
         return mock_workflow_name
     
     
    -def _create_workflow_runner(request, workflow_name, inputs=None, 
executor=None,
    -                            task_max_attempts=None, 
task_retry_interval=None):
    +def _get_compiler(request, workflow_name):
         # helper method for instantiating a workflow runner
    -    service_id = request.getfixturevalue('service').id
    +    service = request.getfixturevalue('service')
         model = request.getfixturevalue('model')
         resource = request.getfixturevalue('resource')
         plugin_manager = request.getfixturevalue('plugin_manager')
     
    -    # task configuration parameters can't be set to None, therefore only
    -    # passing those if they've been set by the test
    -    task_configuration_kwargs = dict()
    -    if task_max_attempts is not None:
    -        task_configuration_kwargs['task_max_attempts'] = task_max_attempts
    -    if task_retry_interval is not None:
    -        task_configuration_kwargs['task_retry_interval'] = 
task_retry_interval
    -
    -    return WorkflowRunner(
    -        workflow_name=workflow_name,
    -        service_id=service_id,
    -        inputs=inputs or {},
    -        executor=executor,
    -        model_storage=model,
    -        resource_storage=resource,
    -        plugin_manager=plugin_manager,
    -        **task_configuration_kwargs)
    +    return execution_compiler.ExecutionCompiler(
    +        model,
    +        resource,
    +        plugin_manager,
    +        service,
    +        workflow_name
    +    )
     
     
     class TestResumableWorkflows(object):
     
    -    def _create_initial_workflow_runner(
    -            self, workflow_context, workflow, executor, inputs=None):
    +    def _compile_execution(
    +            self,
    +            model,
    +            resource,
    +            service,
    +            workflow,
    +            executor,
    +            inputs=None):
     
    -        service = workflow_context.service
             service.workflows['custom_workflow'] = 
tests_mock.models.create_operation(
                 'custom_workflow',
                 operation_kwargs={
                     'function': '{0}.{1}'.format(__name__, workflow.__name__),
                     'inputs': dict((k, models.Input.wrap(k, v)) for k, v in 
(inputs or {}).items())
                 }
             )
    -        workflow_context.model.service.update(service)
    -
    -        wf_runner = WorkflowRunner(
    -            service_id=workflow_context.service.id,
    -            inputs=inputs or {},
    -            model_storage=workflow_context.model,
    -            resource_storage=workflow_context.resource,
    -            plugin_manager=None,
    -            workflow_name='custom_workflow',
    -            executor=executor)
    -        return wf_runner
    +        model.service.update(service)
    +        compiler = execution_compiler.ExecutionCompiler(
    +            model, resource, None, service, 'custom_workflow'
    +        )
    +        ctx = compiler.compile(inputs, executor)
    +        model.execution.update(ctx.execution)
    +
    +        return ctx
     
         @staticmethod
    -    def _wait_for_active_and_cancel(workflow_runner):
    +    def _wait_for_active_and_cancel(eng, ctx):
             if custom_events['is_active'].wait(60) is False:
                 raise TimeoutError("is_active wasn't set to True")
    -        workflow_runner.cancel()
    +        eng.cancel_execution(ctx)
             if custom_events['execution_cancelled'].wait(60) is False:
                 raise TimeoutError("Execution did not end")
     
         def test_resume_workflow(self, workflow_context, thread_executor):
             node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
             node.attributes['invocations'] = 
models.Attribute.wrap('invocations', 0)
             self._create_interface(workflow_context, node, 
mock_pass_first_task_only)
    +        ctx = self._compile_execution(
    +            workflow_context.model,
    +            workflow_context.resource,
    +            workflow_context.model.service.list()[0],
    +            mock_parallel_tasks_workflow,
    +            thread_executor,
    +            inputs={'number_of_tasks': 2}
    +        )
     
    -        wf_runner = self._create_initial_workflow_runner(
    -            workflow_context, mock_parallel_tasks_workflow, 
thread_executor,
    -            inputs={'number_of_tasks': 2})
    +        eng = engine.Engine(thread_executor)
     
    -        wf_thread = Thread(target=wf_runner.execute)
    +        wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
             wf_thread.daemon = True
             wf_thread.start()
     
             # Wait for the execution to start
    -        self._wait_for_active_and_cancel(wf_runner)
    -        node = workflow_context.model.node.refresh(node)
    +        self._wait_for_active_and_cancel(eng, ctx)
    +        node = ctx.model.node.refresh(node)
     
    -        tasks = workflow_context.model.task.list(filters={'_stub_type': 
None})
    +        tasks = ctx.model.task.list(filters={'_stub_type': None})
             assert any(task.status == task.SUCCESS for task in tasks)
             assert any(task.status == task.RETRYING for task in tasks)
             custom_events['is_resumed'].set()
             assert any(task.status == task.RETRYING for task in tasks)
     
             # Create a new workflow runner, with an existing execution id. 
This would cause
    --- End diff --
    
    This comment needs to be revised


---

Reply via email to