Github user aviyoop commented on a diff in the pull request: https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152257452 --- Diff: aria/orchestrator/execution_compiler.py --- @@ -0,0 +1,161 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +from datetime import datetime + +from . import exceptions +from .context.workflow import WorkflowContext +from .workflows import builtin +from .workflows.core import graph_compiler +from .workflows.executor.process import ProcessExecutor +from ..modeling import models +from ..modeling import utils as modeling_utils +from ..utils.imports import import_fullname + + +DEFAULT_TASK_MAX_ATTEMPTS = 30 +DEFAULT_TASK_RETRY_INTERVAL = 30 + + +class ExecutionCompiler(object): + def __init__( + self, + model, + resource, + plugin, + service, + workflow_name, + task_max_attempts=None, + task_retry_interval=None + ): + self._model = model + self._resource = resource + self._plugin = plugin + self._service = service + self._workflow_name = workflow_name + self._workflow_context = None + self._execution = None + self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS + self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL + + @property + def workflow_ctx(self): + if self._workflow_context is None: + self._workflow_context = WorkflowContext( + name=self.__class__.__name__, + model_storage=self._model, + resource_storage=self._resource, + service_id=self._execution.service.id, + execution_id=self._execution.id, + workflow_name=self._execution.workflow_name, + task_max_attempts=self._task_max_attempts, + task_retry_interval=self._task_retry_interval, + ) + return self._workflow_context + + def compile(self, execution_inputs=None, executor=None, execution_id=None): + assert not (execution_inputs and executor and execution_id) + + if execution_id is None: + # If the execution is new + self._execution = self._create_execution_model(execution_inputs) + self._model.execution.put(self._execution) + self._create_tasks(executor) + self._model.execution.update(self._execution) + else: + # If resuming an execution + self._execution = self._model.execution.get(execution_id) + + return self.workflow_ctx + + def _create_tasks(self, executor=None): + + # Set default executor and kwargs + executor = executor or ProcessExecutor(plugin_manager=self._plugin) + + # transforming the execution inputs to dict, to pass them to the workflow function + execution_inputs_dict = dict(inp.unwrapped for inp in self._execution.inputs.itervalues()) + + if len(self._execution.tasks) == 0: + workflow_fn = self._get_workflow_fn(self._execution.workflow_name) + tasks_graph = workflow_fn(ctx=self.workflow_ctx, **execution_inputs_dict) --- End diff -- api_tasks_graph
---