Github user aviyoop commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152257452
  
    --- Diff: aria/orchestrator/execution_compiler.py ---
    @@ -0,0 +1,161 @@
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +
    +import os
    +import sys
    +from datetime import datetime
    +
    +from . import exceptions
    +from .context.workflow import WorkflowContext
    +from .workflows import builtin
    +from .workflows.core import graph_compiler
    +from .workflows.executor.process import ProcessExecutor
    +from ..modeling import models
    +from ..modeling import utils as modeling_utils
    +from ..utils.imports import import_fullname
    +
    +
    +DEFAULT_TASK_MAX_ATTEMPTS = 30
    +DEFAULT_TASK_RETRY_INTERVAL = 30
    +
    +
    +class ExecutionCompiler(object):
    +    def __init__(
    +            self,
    +            model,
    +            resource,
    +            plugin,
    +            service,
    +            workflow_name,
    +            task_max_attempts=None,
    +            task_retry_interval=None
    +    ):
    +        self._model = model
    +        self._resource = resource
    +        self._plugin = plugin
    +        self._service = service
    +        self._workflow_name = workflow_name
    +        self._workflow_context = None
    +        self._execution = None
    +        self._task_max_attempts = task_max_attempts or 
DEFAULT_TASK_MAX_ATTEMPTS
    +        self._task_retry_interval = task_retry_interval or 
DEFAULT_TASK_RETRY_INTERVAL
    +
    +    @property
    +    def workflow_ctx(self):
    +        if self._workflow_context is None:
    +            self._workflow_context = WorkflowContext(
    +                name=self.__class__.__name__,
    +                model_storage=self._model,
    +                resource_storage=self._resource,
    +                service_id=self._execution.service.id,
    +                execution_id=self._execution.id,
    +                workflow_name=self._execution.workflow_name,
    +                task_max_attempts=self._task_max_attempts,
    +                task_retry_interval=self._task_retry_interval,
    +            )
    +        return self._workflow_context
    +
    +    def compile(self, execution_inputs=None, executor=None, 
execution_id=None):
    +        assert not (execution_inputs and executor and execution_id)
    +
    +        if execution_id is None:
    +            # If the execution is new
    +            self._execution = 
self._create_execution_model(execution_inputs)
    +            self._model.execution.put(self._execution)
    +            self._create_tasks(executor)
    +            self._model.execution.update(self._execution)
    +        else:
    +            # If resuming an execution
    +            self._execution = self._model.execution.get(execution_id)
    +
    +        return self.workflow_ctx
    +
    +    def _create_tasks(self, executor=None):
    +
    +        # Set default executor and kwargs
    +        executor = executor or ProcessExecutor(plugin_manager=self._plugin)
    +
    +        # transforming the execution inputs to dict, to pass them to the 
workflow function
    +        execution_inputs_dict = dict(inp.unwrapped for inp in 
self._execution.inputs.itervalues())
    +
    +        if len(self._execution.tasks) == 0:
    +            workflow_fn = 
self._get_workflow_fn(self._execution.workflow_name)
    +            tasks_graph = workflow_fn(ctx=self.workflow_ctx, 
**execution_inputs_dict)
    --- End diff --
    
    api_tasks_graph


---

Reply via email to