wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/907ed6eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/907ed6eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/907ed6eb Branch: refs/heads/ARIA-278-Remove-core-tasks Commit: 907ed6eb5d7364573ef8143a646471b5546a7f8e Parents: 2149a5e Author: max-orlov <ma...@gigaspaces.com> Authored: Sun Jun 11 19:05:35 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Wed Jun 14 17:29:12 2017 +0300 ---------------------------------------------------------------------- aria/modeling/mixins.py | 4 +- aria/modeling/orchestration.py | 150 +++++++--- aria/orchestrator/context/operation.py | 3 + aria/orchestrator/workflow_runner.py | 118 +++++++- aria/orchestrator/workflows/core/__init__.py | 2 +- aria/orchestrator/workflows/core/_task.py | 267 ++++++++++++++++++ aria/orchestrator/workflows/core/engine.py | 67 +++-- .../workflows/core/events_handler.py | 77 +++--- aria/orchestrator/workflows/core/task.py | 271 ------------------- aria/orchestrator/workflows/core/translation.py | 109 -------- aria/orchestrator/workflows/events_logging.py | 25 +- aria/orchestrator/workflows/executor/base.py | 32 ++- aria/orchestrator/workflows/executor/process.py | 18 +- aria/orchestrator/workflows/executor/thread.py | 18 +- tests/orchestrator/context/__init__.py | 8 +- tests/orchestrator/context/test_operation.py | 26 +- tests/orchestrator/context/test_serialize.py | 8 +- .../orchestrator/execution_plugin/test_local.py | 11 +- .../orchestrator/workflows/core/test_engine.py | 14 +- .../orchestrator/workflows/core/test_events.py | 10 +- .../test_task_graph_into_execution_graph.py | 52 ++-- .../orchestrator/workflows/executor/__init__.py | 58 ++-- .../workflows/executor/test_executor.py | 95 +++---- .../workflows/executor/test_process_executor.py | 1 - 24 files changed, 751 insertions(+), 693 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/modeling/mixins.py ---------------------------------------------------------------------- diff --git a/aria/modeling/mixins.py b/aria/modeling/mixins.py index c98a866..31675fe 100644 --- a/aria/modeling/mixins.py +++ b/aria/modeling/mixins.py @@ -18,14 +18,12 @@ classes: * ModelMixin - abstract model implementation. * ModelIDMixin - abstract model implementation with IDs. """ - from sqlalchemy.ext import associationproxy from sqlalchemy import ( Column, Integer, Text, - PickleType -) + PickleType) from ..parser.consumption import ConsumptionContext from ..utils import console, collections, caching, formatting http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 995c8c2..c0b7f04 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -21,9 +21,10 @@ classes: """ # pylint: disable=no-self-argument, no-member, abstract-method - +from contextlib import contextmanager from datetime import datetime +from networkx import DiGraph from sqlalchemy import ( Column, Integer, @@ -34,19 +35,19 @@ from sqlalchemy import ( String, Float, orm, -) + PickleType) from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.declarative import declared_attr from ..orchestrator.exceptions import (TaskAbortException, TaskRetryException) -from .mixins import ModelMixin, ParameterMixin +from . import mixins from . import ( relationship, types as modeling_types ) -class ExecutionBase(ModelMixin): +class ExecutionBase(mixins.ModelMixin): """ Execution model representation. """ @@ -152,7 +153,7 @@ class ExecutionBase(ModelMixin): ) -class PluginBase(ModelMixin): +class PluginBase(mixins.ModelMixin): """ An installed plugin. @@ -213,7 +214,7 @@ class PluginBase(ModelMixin): uploaded_at = Column(DateTime, nullable=False, index=True) -class TaskBase(ModelMixin): +class TaskBase(mixins.ModelMixin): """ Represents the smallest unit of stateful execution in ARIA. The task state includes inputs, outputs, as well as an atomic status, ensuring that the task can only be running once at any @@ -257,10 +258,25 @@ class TaskBase(ModelMixin): __tablename__ = 'task' - __private_fields__ = ['node_fk', - 'relationship_fk', - 'plugin_fk', - 'execution_fk'] + __private_fields__ = ['dependency_operation_task_fk', 'dependency_stub_task_fk', 'node_fk', + 'relationship_fk', 'plugin_fk', 'execution_fk'] + + + START_WORKFLOW = 'start_workflow' + END_WORKFLOW = 'end_workflow' + START_SUBWROFKLOW = 'start_subworkflow' + END_SUBWORKFLOW = 'end_subworkflow' + STUB = 'stub' + CONDITIONAL = 'conditional' + + STUB_TYPES = ( + START_WORKFLOW, + START_SUBWROFKLOW, + END_WORKFLOW, + END_SUBWORKFLOW, + STUB, + CONDITIONAL, + ) PENDING = 'pending' RETRYING = 'retrying' @@ -276,10 +292,27 @@ class TaskBase(ModelMixin): SUCCESS, FAILED, ) - INFINITE_RETRIES = -1 @declared_attr + def execution(cls): + return relationship.many_to_one(cls, 'execution') + + @declared_attr + def execution_fk(cls): + return relationship.foreign_key('execution', nullable=True) + + status = Column(Enum(*STATES, name='status'), default=PENDING) + due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow()) + started_at = Column(DateTime, default=None) + ended_at = Column(DateTime, default=None) + attempts_count = Column(Integer, default=1) + api_id = Column(String) + + _executor = Column(PickleType) + _context_cls = Column(PickleType) + + @declared_attr def logs(cls): return relationship.one_to_many(cls, 'log') @@ -296,10 +329,6 @@ class TaskBase(ModelMixin): return relationship.many_to_one(cls, 'plugin') @declared_attr - def execution(cls): - return relationship.many_to_one(cls, 'execution') - - @declared_attr def arguments(cls): return relationship.one_to_many(cls, 'argument', dict_key='name') @@ -307,19 +336,10 @@ class TaskBase(ModelMixin): max_attempts = Column(Integer, default=1) retry_interval = Column(Float, default=0) ignore_failure = Column(Boolean, default=False) + interface_name = Column(String) + operation_name = Column(String) - # State - status = Column(Enum(*STATES, name='status'), default=PENDING) - due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow()) - started_at = Column(DateTime, default=None) - ended_at = Column(DateTime, default=None) - attempts_count = Column(Integer, default=1) - - def has_ended(self): - return self.status in (self.SUCCESS, self.FAILED) - - def is_waiting(self): - return self.status in (self.PENDING, self.RETRYING) + stub_type = Column(Enum(*STUB_TYPES)) @property def actor(self): @@ -351,10 +371,6 @@ class TaskBase(ModelMixin): def plugin_fk(cls): return relationship.foreign_key('plugin', nullable=True) - @declared_attr - def execution_fk(cls): - return relationship.foreign_key('execution', nullable=True) - # endregion # region association proxies @@ -376,14 +392,6 @@ class TaskBase(ModelMixin): # endregion - @classmethod - def for_node(cls, actor, **kwargs): - return cls(node=actor, **kwargs) - - @classmethod - def for_relationship(cls, actor, **kwargs): - return cls(relationship=actor, **kwargs) - @staticmethod def abort(message=None): raise TaskAbortException(message) @@ -392,8 +400,68 @@ class TaskBase(ModelMixin): def retry(message=None, retry_interval=None): raise TaskRetryException(message, retry_interval=retry_interval) + @declared_attr + def dependency_fk(self): + return relationship.foreign_key('task', nullable=True) -class LogBase(ModelMixin): + @declared_attr + def dependencies(cls): + # symmetric relationship causes funky graphs + return relationship.one_to_many_self(cls, 'dependency_fk') + + def has_ended(self): + if self.stub_type is not None: + return self.status == self.SUCCESS + else: + return self.status in (self.SUCCESS, self.FAILED) + + def is_waiting(self): + if self.stub_type: + return not self.has_ended() + else: + return self.status in (self.PENDING, self.RETRYING) + + @classmethod + def from_api_task(cls, api_task, executor, **kwargs): + from aria.orchestrator import context + instantiation_kwargs = {} + + if hasattr(api_task.actor, 'outbound_relationships'): + context_cls = context.operation.NodeOperationContext + instantiation_kwargs['node'] = api_task.actor + elif hasattr(api_task.actor, 'source_node'): + context_cls = context.operation.RelationshipOperationContext + instantiation_kwargs['relationship'] = api_task.actor + else: + raise RuntimeError('No operation context could be created for {actor.model_cls}' + .format(actor=api_task.actor)) + + instantiation_kwargs.update( + { + 'name': api_task.name, + 'status': cls.PENDING, + 'max_attempts': api_task.max_attempts, + 'retry_interval': api_task.retry_interval, + 'ignore_failure': api_task.ignore_failure, + 'execution': api_task._workflow_context.execution, + 'interface_name': api_task.interface_name, + 'operation_name': api_task.operation_name, + + # Only non-stub tasks have these fields + 'plugin': api_task.plugin, + 'function': api_task.function, + 'arguments': api_task.arguments, + 'api_id': api_task.id, + '_context_cls': context_cls, + '_executor': executor, + }) + + instantiation_kwargs.update(**kwargs) + + return cls(**instantiation_kwargs) + + +class LogBase(mixins.ModelMixin): __tablename__ = 'log' @@ -435,7 +503,7 @@ class LogBase(ModelMixin): return '{name}: {self.msg}'.format(name=name, self=self) -class ArgumentBase(ParameterMixin): +class ArgumentBase(mixins.ParameterMixin): __tablename__ = 'argument' http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index efdc04d..7477912 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -58,6 +58,9 @@ class BaseOperationContext(common.BaseContext): self._thread_local.task = self.model.task.get(self._task_id) return self._thread_local.task + def update_task(self): + self.model.task.update(self.task) + @property def plugin_workdir(self): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 848c59b..f09cb79 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -21,11 +21,15 @@ import os import sys from datetime import datetime +from networkx import DiGraph + from . import exceptions from .context.workflow import WorkflowContext from .workflows import builtin from .workflows.core.engine import Engine from .workflows.executor.process import ProcessExecutor +from .workflows.executor.base import StubTaskExecutor +from .workflows.api import task from ..modeling import models from ..modeling import utils as modeling_utils from ..utils.imports import import_fullname @@ -80,15 +84,21 @@ class WorkflowRunner(object): task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval) + # Set default executor and kwargs + executor = executor or ProcessExecutor(plugin_manager=plugin_manager) + # transforming the execution inputs to dict, to pass them to the workflow function execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values()) + self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict) + construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__) - executor = executor or ProcessExecutor(plugin_manager=plugin_manager) - self._engine = Engine( - executor=executor, - workflow_context=workflow_context, - tasks_graph=self._tasks_graph) + # Update the state + self._model_storage.execution.update(execution) + + self._engine = Engine(executor=executor, + workflow_context=workflow_context, + execution_graph=get_execution_graph(self.execution)) @property def execution_id(self): @@ -166,3 +176,101 @@ class WorkflowRunner(object): self._workflow_name, workflow.function)) return workflow_fn + + +def get_execution_graph(execution): + graph = DiGraph() + for task in execution.tasks: + for dependency in task.dependencies: + graph.add_edge(dependency, task) + + return graph + + +def construct_execution_tasks(execution, + task_graph, + default_executor, + stub_executor=StubTaskExecutor, + start_stub_type=models.Task.START_WORKFLOW, + end_stub_type=models.Task.END_WORKFLOW, + depends_on=()): + """ + Translates the user graph to the execution graph + :param task_graph: The user's graph + :param start_stub_type: internal use + :param end_stub_type: internal use + :param depends_on: internal use + """ + depends_on = list(depends_on) + + # Insert start marker + start_task = models.Task(api_id=_start_graph_suffix(task_graph.id), + _executor=stub_executor, + execution=execution, + stub_type=start_stub_type, + dependencies=depends_on) + + for api_task in task_graph.topological_order(reverse=True): + operation_dependencies = _get_tasks_from_dependencies(execution, + task_graph.get_dependencies(api_task), [start_task]) + + if isinstance(api_task, task.OperationTask): + models.Task.from_api_task(api_task=api_task, + executor=default_executor, + dependencies=operation_dependencies) + + elif isinstance(api_task, task.WorkflowTask): + # Build the graph recursively while adding start and end markers + construct_execution_tasks( + execution=execution, + task_graph=api_task, + default_executor=default_executor, + stub_executor=stub_executor, + start_stub_type=models.Task.START_SUBWROFKLOW, + end_stub_type=models.Task.END_SUBWORKFLOW, + depends_on=operation_dependencies + ) + elif isinstance(api_task, task.StubTask): + models.Task(api_id=api_task.id, + _executor=stub_executor, + execution=execution, + stub_type=models.Task.STUB, + dependencies=operation_dependencies) + else: + raise + + # Insert end marker + models.Task(api_id=_end_graph_suffix(task_graph.id), + _executor=stub_executor, + execution=execution, + stub_type=end_stub_type, + dependencies=_get_non_dependent_tasks(execution) or [start_task]) + + +def _start_graph_suffix(api_id): + return '{0}-Start'.format(api_id) + + +def _end_graph_suffix(api_id): + return '{0}-End'.format(api_id) + + +def _get_non_dependent_tasks(execution): + dependency_tasks = set() + for task in execution.tasks: + dependency_tasks.update(task.dependencies) + return list(set(execution.tasks) - set(dependency_tasks)) + + +def _get_tasks_from_dependencies(execution, dependencies, default=()): + """ + Returns task list from dependencies. + """ + tasks = [] + for dependency in dependencies: + if getattr(dependency, 'actor', False): + dependency_name = dependency.id + else: + dependency_name = _end_graph_suffix(dependency.id) + tasks.extend(task for task in execution.tasks if task.api_id == dependency_name) + return tasks or default http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/orchestrator/workflows/core/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/__init__.py b/aria/orchestrator/workflows/core/__init__.py index e377153..81db43f 100644 --- a/aria/orchestrator/workflows/core/__init__.py +++ b/aria/orchestrator/workflows/core/__init__.py @@ -17,4 +17,4 @@ Core for the workflow execution mechanism """ -from . import task, translation, engine +from . import engine http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/orchestrator/workflows/core/_task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/_task.py b/aria/orchestrator/workflows/core/_task.py new file mode 100644 index 0000000..399c177 --- /dev/null +++ b/aria/orchestrator/workflows/core/_task.py @@ -0,0 +1,267 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow tasks +""" + +from contextlib import contextmanager +from datetime import datetime +from functools import ( + partial, + wraps, +) + + +from ....modeling import models +from ...context import operation as operation_context +from .. import exceptions + + +def _locked(func=None): + if func is None: + return partial(_locked, func=_locked) + + @wraps(func) + def _wrapper(self, value, **kwargs): + if self._update_fields is None: + raise exceptions.TaskException('Task is not in update mode') + return func(self, value, **kwargs) + return _wrapper + + +class BaseTask(object): + """ + Base class for Task objects + """ + + def __init__(self, id, executor, *args, **kwargs): + super(BaseTask, self).__init__(*args, **kwargs) + self._id = id + self._executor = executor + + def execute(self): + return self._executor.execute(self) + + @property + def id(self): + """ + :return: the task's id + """ + return self._id + + +class StubTask(BaseTask): + """ + Base stub task for marker user tasks that only mark the start/end of a workflow + or sub-workflow + """ + STARTED = models.Task.STARTED + SUCCESS = models.Task.SUCCESS + + def __init__(self, *args, **kwargs): + super(StubTask, self).__init__(*args, **kwargs) + self.status = models.Task.PENDING + self.due_at = datetime.utcnow() + + + + +class StartWorkflowTask(StubTask): + """ + Task marking a workflow start + """ + pass + + +class EndWorkflowTask(StubTask): + """ + Task marking a workflow end + """ + pass + + +class StartSubWorkflowTask(StubTask): + """ + Task marking a subworkflow start + """ + pass + + +class EndSubWorkflowTask(StubTask): + """ + Task marking a subworkflow end + """ + pass + + +class OperationTask(BaseTask): + """ + Operation task + """ + def __init__(self, api_task, *args, **kwargs): + # If no executor is provided, we infer that this is an empty task which does not need to be + # executed. + super(OperationTask, self).__init__(id=api_task.id, *args, **kwargs) + self._workflow_context = api_task._workflow_context + self.interface_name = api_task.interface_name + self.operation_name = api_task.operation_name + model_storage = api_task._workflow_context.model + + actor = getattr(api_task.actor, '_wrapped', api_task.actor) + + base_task_model = model_storage.task.model_cls + if isinstance(actor, models.Node): + context_cls = operation_context.NodeOperationContext + create_task_model = base_task_model.for_node + elif isinstance(actor, models.Relationship): + context_cls = operation_context.RelationshipOperationContext + create_task_model = base_task_model.for_relationship + else: + raise RuntimeError('No operation context could be created for {actor.model_cls}' + .format(actor=actor)) + + task_model = create_task_model( + name=api_task.name, + actor=actor, + status=base_task_model.PENDING, + max_attempts=api_task.max_attempts, + retry_interval=api_task.retry_interval, + ignore_failure=api_task.ignore_failure, + execution=self._workflow_context.execution, + + # Only non-stub tasks have these fields + plugin=api_task.plugin, + function=api_task.function, + arguments=api_task.arguments + ) + self._workflow_context.model.task.put(task_model) + + self._ctx = context_cls(name=api_task.name, + model_storage=self._workflow_context.model, + resource_storage=self._workflow_context.resource, + service_id=self._workflow_context._service_id, + task_id=task_model.id, + actor_id=actor.id, + execution_id=self._workflow_context._execution_id, + workdir=self._workflow_context._workdir) + self._task_id = task_model.id + self._update_fields = None + + @contextmanager + def _update(self): + """ + A context manager which puts the task into update mode, enabling fields update. + :yields: None + """ + self._update_fields = {} + try: + yield + for key, value in self._update_fields.items(): + setattr(self.model_task, key, value) + self.model_task = self.model_task + finally: + self._update_fields = None + + @property + def model_task(self): + """ + Returns the task model in storage + :return: task in storage + """ + return self._workflow_context.model.task.get(self._task_id) + + @model_task.setter + def model_task(self, value): + self._workflow_context.model.task.put(value) + + @property + def context(self): + """ + Contexts for the operation + :return: + """ + return self._ctx + + @property + def status(self): + """ + Returns the task status + :return: task status + """ + return self.model_task.status + + @status.setter + @_locked + def status(self, value): + self._update_fields['status'] = value + + @property + def started_at(self): + """ + Returns when the task started + :return: when task started + """ + return self.model_task.started_at + + @started_at.setter + @_locked + def started_at(self, value): + self._update_fields['started_at'] = value + + @property + def ended_at(self): + """ + Returns when the task ended + :return: when task ended + """ + return self.model_task.ended_at + + @ended_at.setter + @_locked + def ended_at(self, value): + self._update_fields['ended_at'] = value + + @property + def attempts_count(self): + """ + Returns the attempts count for the task + :return: attempts count + """ + return self.model_task.attempts_count + + @attempts_count.setter + @_locked + def attempts_count(self, value): + self._update_fields['attempts_count'] = value + + @property + def due_at(self): + """ + Returns the minimum datetime in which the task can be executed + :return: eta + """ + return self.model_task.due_at + + @due_at.setter + @_locked + def due_at(self, value): + self._update_fields['due_at'] = value + + def __getattr__(self, attr): + try: + return getattr(self.model_task, attr) + except AttributeError: + return super(OperationTask, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 3a96804..e1b6412 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -20,15 +20,12 @@ The workflow engine. Executes workflows import time from datetime import datetime -import networkx - from aria import logger from aria.modeling import models from aria.orchestrator import events +from aria.orchestrator.context import operation from .. import exceptions -from . import task as engine_task -from . import translation # Import required so all signals are registered from . import events_handler # pylint: disable=unused-import @@ -38,13 +35,11 @@ class Engine(logger.LoggerMixin): The workflow engine. Executes workflows """ - def __init__(self, executor, workflow_context, tasks_graph, **kwargs): + def __init__(self, executor, workflow_context, execution_graph, **kwargs): super(Engine, self).__init__(**kwargs) self._workflow_context = workflow_context - self._execution_graph = networkx.DiGraph() - translation.build_execution_graph(task_graph=tasks_graph, - execution_graph=self._execution_graph, - default_executor=executor) + self._executors = {executor.__class__: executor} + self._execution_graph = execution_graph def execute(self): """ @@ -79,43 +74,57 @@ class Engine(logger.LoggerMixin): will be modified to 'cancelled' directly. """ events.on_cancelling_workflow_signal.send(self._workflow_context) + self._workflow_context.execution = self._workflow_context.execution def _is_cancel(self): - return self._workflow_context.execution.status in (models.Execution.CANCELLING, - models.Execution.CANCELLED) + execution = self._workflow_context.model.execution.update(self._workflow_context.execution) + return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED) def _executable_tasks(self): now = datetime.utcnow() - return (task for task in self._tasks_iter() - if task.is_waiting() and - task.due_at <= now and - not self._task_has_dependencies(task)) + return ( + task for task in self._tasks_iter() + if task.is_waiting() and task.due_at <= now and not self._task_has_dependencies(task) + ) def _ended_tasks(self): - return (task for task in self._tasks_iter() if task.has_ended()) + return (task for task in self._tasks_iter() + if task.has_ended() and task in self._execution_graph) def _task_has_dependencies(self, task): - return len(self._execution_graph.pred.get(task.id, {})) > 0 + return task.dependencies and all(d in self._execution_graph for d in task.dependencies) def _all_tasks_consumed(self): return len(self._execution_graph.node) == 0 def _tasks_iter(self): - for _, data in self._execution_graph.nodes_iter(data=True): - task = data['task'] - if isinstance(task, engine_task.OperationTask): - if not task.model_task.has_ended(): - self._workflow_context.model.task.refresh(task.model_task) - yield task - - @staticmethod - def _handle_executable_task(task): - if isinstance(task, engine_task.OperationTask): + for task in self._workflow_context.execution.tasks: + yield self._workflow_context.model.task.refresh(task) + + def _handle_executable_task(self, task): + if not task.stub_type: events.sent_task_signal.send(task) - task.execute() + + if task._executor not in self._executors: + self._executors[task._executor] = task._executor() + executor = self._executors[task._executor] + + context_cls = task._context_cls or operation.BaseOperationContext + op_ctx = context_cls( + model_storage=self._workflow_context.model, + resource_storage=self._workflow_context.resource, + workdir=self._workflow_context._workdir, + task_id=task.id, + actor_id=task.actor.id if task.actor else None, + service_id=task.execution.service.id, + execution_id=task.execution.id, + name=task.name + ) + + executor.execute(op_ctx) def _handle_ended_tasks(self, task): if task.status == models.Task.FAILED and not task.ignore_failure: raise exceptions.ExecutorException('Workflow failed') else: - self._execution_graph.remove_node(task.id) + self._execution_graph.remove_node(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 669fb43..8b217f5 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -31,50 +31,47 @@ from ... import exceptions @events.sent_task_signal.connect def _task_sent(task, *args, **kwargs): - with task._update(): - task.status = task.SENT + task.status = task.SENT @events.start_task_signal.connect -def _task_started(task, *args, **kwargs): - with task._update(): - task.started_at = datetime.utcnow() - task.status = task.STARTED - _update_node_state_if_necessary(task, is_transitional=True) +def _task_started(ctx, *args, **kwargs): + ctx.task.started_at = datetime.utcnow() + ctx.task.status = ctx.task.STARTED + _update_node_state_if_necessary(ctx, is_transitional=True) @events.on_failure_task_signal.connect -def _task_failed(task, exception, *args, **kwargs): - with task._update(): - should_retry = all([ - not isinstance(exception, exceptions.TaskAbortException), - task.attempts_count < task.max_attempts or task.max_attempts == task.INFINITE_RETRIES, - # ignore_failure check here means the task will not be retries and it will be marked - # as failed. The engine will also look at ignore_failure so it won't fail the - # workflow. - not task.ignore_failure - ]) - if should_retry: - retry_interval = None - if isinstance(exception, exceptions.TaskRetryException): - retry_interval = exception.retry_interval - if retry_interval is None: - retry_interval = task.retry_interval - task.status = task.RETRYING - task.attempts_count += 1 - task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval) - else: - task.ended_at = datetime.utcnow() - task.status = task.FAILED +def _task_failed(ctx, exception, *args, **kwargs): + should_retry = all([ + not isinstance(exception, exceptions.TaskAbortException), + ctx.task.attempts_count < ctx.task.max_attempts or + ctx.task.max_attempts == ctx.task.INFINITE_RETRIES, + # ignore_failure check here means the task will not be retried and it will be marked + # as failed. The engine will also look at ignore_failure so it won't fail the + # workflow. + not ctx.task.ignore_failure + ]) + if should_retry: + retry_interval = None + if isinstance(exception, exceptions.TaskRetryException): + retry_interval = exception.retry_interval + if retry_interval is None: + retry_interval = ctx.task.retry_interval + ctx.task.status = ctx.task.RETRYING + ctx.task.attempts_count += 1 + ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval) + else: + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.FAILED @events.on_success_task_signal.connect -def _task_succeeded(task, *args, **kwargs): - with task._update(): - task.ended_at = datetime.utcnow() - task.status = task.SUCCESS +def _task_succeeded(ctx, *args, **kwargs): + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.SUCCESS - _update_node_state_if_necessary(task) + _update_node_state_if_necessary(ctx) @events.start_workflow_signal.connect @@ -133,17 +130,17 @@ def _workflow_cancelling(workflow_context, *args, **kwargs): workflow_context.execution = execution -def _update_node_state_if_necessary(task, is_transitional=False): +def _update_node_state_if_necessary(ctx, is_transitional=False): # TODO: this is not the right way to check! the interface name is arbitrary # and also will *never* be the type name - model_task = task.model_task - node = model_task.node if model_task is not None else None + node = ctx.task.node if ctx.task is not None else None if (node is not None) and \ - (task.interface_name in ('Standard', 'tosca.interfaces.node.lifecycle.Standard')): - state = node.determine_state(op_name=task.operation_name, is_transitional=is_transitional) + (ctx.task.interface_name in ('Standard', 'tosca.interfaces.node.lifecycle.Standard')): + state = node.determine_state(op_name=ctx.task.operation_name, + is_transitional=is_transitional) if state: node.state = state - task.context.model.node.update(node) + ctx.model.node.update(node) def _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, status): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py deleted file mode 100644 index d732f09..0000000 --- a/aria/orchestrator/workflows/core/task.py +++ /dev/null @@ -1,271 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow tasks -""" - -from contextlib import contextmanager -from datetime import datetime -from functools import ( - partial, - wraps, -) - - -from ....modeling import models -from ...context import operation as operation_context -from .. import exceptions - - -def _locked(func=None): - if func is None: - return partial(_locked, func=_locked) - - @wraps(func) - def _wrapper(self, value, **kwargs): - if self._update_fields is None: - raise exceptions.TaskException('Task is not in update mode') - return func(self, value, **kwargs) - return _wrapper - - -class BaseTask(object): - """ - Base class for Task objects - """ - - def __init__(self, id, executor, *args, **kwargs): - super(BaseTask, self).__init__(*args, **kwargs) - self._id = id - self._executor = executor - - def execute(self): - return self._executor.execute(self) - - @property - def id(self): - """ - :return: the task's id - """ - return self._id - - -class StubTask(BaseTask): - """ - Base stub task for marker user tasks that only mark the start/end of a workflow - or sub-workflow - """ - STARTED = models.Task.STARTED - SUCCESS = models.Task.SUCCESS - - def __init__(self, *args, **kwargs): - super(StubTask, self).__init__(*args, **kwargs) - self.status = models.Task.PENDING - self.due_at = datetime.utcnow() - - def has_ended(self): - return self.status == self.SUCCESS - - def is_waiting(self): - return not self.has_ended() - - -class StartWorkflowTask(StubTask): - """ - Task marking a workflow start - """ - pass - - -class EndWorkflowTask(StubTask): - """ - Task marking a workflow end - """ - pass - - -class StartSubWorkflowTask(StubTask): - """ - Task marking a subworkflow start - """ - pass - - -class EndSubWorkflowTask(StubTask): - """ - Task marking a subworkflow end - """ - pass - - -class OperationTask(BaseTask): - """ - Operation task - """ - def __init__(self, api_task, *args, **kwargs): - # If no executor is provided, we infer that this is an empty task which does not need to be - # executed. - super(OperationTask, self).__init__(id=api_task.id, *args, **kwargs) - self._workflow_context = api_task._workflow_context - self.interface_name = api_task.interface_name - self.operation_name = api_task.operation_name - model_storage = api_task._workflow_context.model - - actor = getattr(api_task.actor, '_wrapped', api_task.actor) - - base_task_model = model_storage.task.model_cls - if isinstance(actor, models.Node): - context_cls = operation_context.NodeOperationContext - create_task_model = base_task_model.for_node - elif isinstance(actor, models.Relationship): - context_cls = operation_context.RelationshipOperationContext - create_task_model = base_task_model.for_relationship - else: - raise RuntimeError('No operation context could be created for {actor.model_cls}' - .format(actor=actor)) - - task_model = create_task_model( - name=api_task.name, - actor=actor, - status=base_task_model.PENDING, - max_attempts=api_task.max_attempts, - retry_interval=api_task.retry_interval, - ignore_failure=api_task.ignore_failure, - execution=self._workflow_context.execution, - - # Only non-stub tasks have these fields - plugin=api_task.plugin, - function=api_task.function, - arguments=api_task.arguments - ) - self._workflow_context.model.task.put(task_model) - - self._ctx = context_cls(name=api_task.name, - model_storage=self._workflow_context.model, - resource_storage=self._workflow_context.resource, - service_id=self._workflow_context._service_id, - task_id=task_model.id, - actor_id=actor.id, - execution_id=self._workflow_context._execution_id, - workdir=self._workflow_context._workdir) - self._task_id = task_model.id - self._update_fields = None - - @contextmanager - def _update(self): - """ - A context manager which puts the task into update mode, enabling fields update. - :yields: None - """ - self._update_fields = {} - try: - yield - for key, value in self._update_fields.items(): - setattr(self.model_task, key, value) - self.model_task = self.model_task - finally: - self._update_fields = None - - @property - def model_task(self): - """ - Returns the task model in storage - :return: task in storage - """ - return self._workflow_context.model.task.get(self._task_id) - - @model_task.setter - def model_task(self, value): - self._workflow_context.model.task.put(value) - - @property - def context(self): - """ - Contexts for the operation - :return: - """ - return self._ctx - - @property - def status(self): - """ - Returns the task status - :return: task status - """ - return self.model_task.status - - @status.setter - @_locked - def status(self, value): - self._update_fields['status'] = value - - @property - def started_at(self): - """ - Returns when the task started - :return: when task started - """ - return self.model_task.started_at - - @started_at.setter - @_locked - def started_at(self, value): - self._update_fields['started_at'] = value - - @property - def ended_at(self): - """ - Returns when the task ended - :return: when task ended - """ - return self.model_task.ended_at - - @ended_at.setter - @_locked - def ended_at(self, value): - self._update_fields['ended_at'] = value - - @property - def attempts_count(self): - """ - Returns the attempts count for the task - :return: attempts count - """ - return self.model_task.attempts_count - - @attempts_count.setter - @_locked - def attempts_count(self, value): - self._update_fields['attempts_count'] = value - - @property - def due_at(self): - """ - Returns the minimum datetime in which the task can be executed - :return: eta - """ - return self.model_task.due_at - - @due_at.setter - @_locked - def due_at(self, value): - self._update_fields['due_at'] = value - - def __getattr__(self, attr): - try: - return getattr(self.model_task, attr) - except AttributeError: - return super(OperationTask, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/orchestrator/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py deleted file mode 100644 index fec108b..0000000 --- a/aria/orchestrator/workflows/core/translation.py +++ /dev/null @@ -1,109 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Translation of user graph's API to the execution graph -""" - -from .. import api -from ..executor import base -from . import task as core_task - - -def build_execution_graph( - task_graph, - execution_graph, - default_executor, - start_cls=core_task.StartWorkflowTask, - end_cls=core_task.EndWorkflowTask, - depends_on=()): - """ - Translates the user graph to the execution graph - :param task_graph: The user's graph - :param workflow_context: The workflow - :param execution_graph: The execution graph that is being built - :param start_cls: internal use - :param end_cls: internal use - :param depends_on: internal use - """ - # Insert start marker - start_task = start_cls(id=_start_graph_suffix(task_graph.id), executor=base.StubTaskExecutor()) - _add_task_and_dependencies(execution_graph, start_task, depends_on) - - for api_task in task_graph.topological_order(reverse=True): - dependencies = task_graph.get_dependencies(api_task) - operation_dependencies = _get_tasks_from_dependencies( - execution_graph, dependencies, default=[start_task]) - - if isinstance(api_task, api.task.OperationTask): - operation_task = core_task.OperationTask(api_task, executor=default_executor) - _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) - elif isinstance(api_task, api.task.WorkflowTask): - # Build the graph recursively while adding start and end markers - build_execution_graph( - task_graph=api_task, - execution_graph=execution_graph, - default_executor=default_executor, - start_cls=core_task.StartSubWorkflowTask, - end_cls=core_task.EndSubWorkflowTask, - depends_on=operation_dependencies - ) - elif isinstance(api_task, api.task.StubTask): - stub_task = core_task.StubTask(id=api_task.id, executor=base.StubTaskExecutor()) - _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies) - else: - raise RuntimeError('Undefined state') - - # Insert end marker - workflow_dependencies = _get_tasks_from_dependencies( - execution_graph, - _get_non_dependency_tasks(task_graph), - default=[start_task]) - end_task = end_cls(id=_end_graph_suffix(task_graph.id), executor=base.StubTaskExecutor()) - _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies) - - -def _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies=()): - execution_graph.add_node(operation_task.id, task=operation_task) - for dependency in operation_dependencies: - execution_graph.add_edge(dependency.id, operation_task.id) - - -def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): - """ - Returns task list from dependencies. - """ - tasks = [] - for dependency in dependencies: - if isinstance(dependency, (api.task.OperationTask, api.task.StubTask)): - dependency_id = dependency.id - else: - dependency_id = _end_graph_suffix(dependency.id) - tasks.append(execution_graph.node[dependency_id]['task']) - return tasks or default - - -def _start_graph_suffix(id): - return '{0}-Start'.format(id) - - -def _end_graph_suffix(id): - return '{0}-End'.format(id) - - -def _get_non_dependency_tasks(graph): - for task in graph.tasks: - if len(list(graph.get_dependents(task))) == 0: - yield task http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index 036c1f7..4cee867 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -34,31 +34,32 @@ def _get_task_name(task): @events.start_task_signal.connect -def _start_task_handler(task, **kwargs): +def _start_task_handler(ctx, **kwargs): # If the task has no function this is an empty task. - if task.function: + if ctx.task.function: suffix = 'started...' - logger = task.context.logger.info + logger = ctx.logger.info else: suffix = 'has no implementation' - logger = task.context.logger.debug + logger = ctx.logger.debug logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format( - name=_get_task_name(task), task=task, suffix=suffix)) + name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix)) + @events.on_success_task_signal.connect -def _success_task_handler(task, **kwargs): - if not task.function: +def _success_task_handler(ctx, **kwargs): + if not ctx.task.function: return - task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful' - .format(name=_get_task_name(task), task=task)) + ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful' + .format(name=_get_task_name(ctx.task), task=ctx.task)) @events.on_failure_task_signal.connect -def _failure_operation_handler(task, traceback, **kwargs): - task.context.logger.error( +def _failure_operation_handler(ctx, traceback, **kwargs): + ctx.logger.error( '{name} {task.interface_name}.{task.operation_name} failed' - .format(name=_get_task_name(task), task=task), extra=dict(traceback=traceback) + .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback) ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 7fece6f..fc4b800 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -28,19 +28,20 @@ class BaseExecutor(logger.LoggerMixin): def _execute(self, task): raise NotImplementedError - def execute(self, task): + def execute(self, ctx): """ Execute a task :param task: task to execute """ - if task.function: - self._execute(task) + ctx.update_task() + if ctx.task.function: + self._execute(ctx) else: # In this case the task is missing a function. This task still gets to an # executor, but since there is nothing to run, we by default simply skip the execution # itself. - self._task_started(task) - self._task_succeeded(task) + self._task_started(ctx) + self._task_succeeded(ctx) def close(self): """ @@ -49,18 +50,19 @@ class BaseExecutor(logger.LoggerMixin): pass @staticmethod - def _task_started(task): - events.start_task_signal.send(task) + def _task_started(ctx): + events.start_task_signal.send(ctx) + ctx.update_task() - @staticmethod - def _task_failed(task, exception, traceback=None): - events.on_failure_task_signal.send(task, exception=exception, traceback=traceback) + def _task_failed(self, ctx, exception, traceback=None): + events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback) + ctx.update_task() - @staticmethod - def _task_succeeded(task): - events.on_success_task_signal.send(task) + def _task_succeeded(self, ctx): + events.on_success_task_signal.send(ctx) + ctx.update_task() class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method - def execute(self, task): - task.status = task.SUCCESS + def execute(self, ctx, *args, **kwargs): + ctx.task.status = ctx.task.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 634f1f2..8518b33 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -113,17 +113,17 @@ class ProcessExecutor(base.BaseExecutor): self._server_socket.close() self._listener_thread.join(timeout=60) - def _execute(self, task): + def _execute(self, ctx): self._check_closed() - self._tasks[task.id] = task + self._tasks[ctx.task.id] = ctx # Temporary file used to pass arguments to the started subprocess file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json') os.close(file_descriptor) with open(arguments_json_path, 'wb') as f: - f.write(pickle.dumps(self._create_arguments_dict(task))) + f.write(pickle.dumps(self._create_arguments_dict(ctx))) - env = self._construct_subprocess_env(task=task) + env = self._construct_subprocess_env(task=ctx.task) # Asynchronously start the operation in a subprocess subprocess.Popen( '{0} {1} {2}'.format(sys.executable, __file__, arguments_json_path), @@ -137,13 +137,13 @@ class ProcessExecutor(base.BaseExecutor): if self._stopped: raise RuntimeError('Executor closed') - def _create_arguments_dict(self, task): + def _create_arguments_dict(self, ctx): return { - 'task_id': task.id, - 'function': task.function, - 'operation_arguments': dict(arg.unwrapped for arg in task.arguments.values()), + 'task_id': ctx.task.id, + 'function': ctx.task.function, + 'operation_arguments': dict(arg.unwrapped for arg in ctx.task.arguments.values()), 'port': self._server_port, - 'context': task.context.serialization_dict, + 'context': ctx.serialization_dict, } def _construct_subprocess_env(self, task): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 56a56a5..8c447b6 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -46,8 +46,8 @@ class ThreadExecutor(BaseExecutor): thread.start() self._pool.append(thread) - def _execute(self, task): - self._queue.put(task) + def _execute(self, ctx): + self._queue.put(ctx) def close(self): self._stopped = True @@ -57,15 +57,15 @@ class ThreadExecutor(BaseExecutor): def _processor(self): while not self._stopped: try: - task = self._queue.get(timeout=1) - self._task_started(task) + ctx = self._queue.get(timeout=1) + self._task_started(ctx) try: - task_func = imports.load_attribute(task.function) - arguments = dict(arg.unwrapped for arg in task.arguments.values()) - task_func(ctx=task.context, **arguments) - self._task_succeeded(task) + task_func = imports.load_attribute(ctx.task.function) + arguments = dict(arg.unwrapped for arg in ctx.task.arguments.values()) + task_func(ctx=ctx, **arguments) + self._task_succeeded(ctx) except BaseException as e: - self._task_failed(task, + self._task_failed(ctx, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info())) # Daemon threads http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/tests/orchestrator/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py index 4fde0a7..cb282a3 100644 --- a/tests/orchestrator/context/__init__.py +++ b/tests/orchestrator/context/__init__.py @@ -16,6 +16,7 @@ import sys from aria.orchestrator.workflows.core import engine +from aria.orchestrator import workflow_runner def op_path(func, module_path=None): @@ -25,5 +26,10 @@ def op_path(func, module_path=None): def execute(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) - eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph) + + workflow_runner.construct_execution_tasks(workflow_context.execution, graph, executor.__class__) + workflow_context.execution = workflow_context.execution + execution_graph = workflow_runner.get_execution_graph(workflow_context.execution) + eng = engine.Engine(executor, workflow_context, execution_graph) + eng.execute() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index 3dcfaa2..f654fe5 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -50,21 +50,12 @@ def ctx(tmpdir): @pytest.fixture -def process_executor(): - ex = process.ProcessExecutor(**dict(python_path=tests.ROOT_DIR)) - try: - yield ex - finally: - ex.close() - - -@pytest.fixture def thread_executor(): - ex = thread.ThreadExecutor() + result = thread.ThreadExecutor() try: - yield ex + yield result finally: - ex.close() + result.close() @pytest.fixture @@ -266,12 +257,12 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), ]) def executor(request): - executor_cls, executor_kwargs = request.param - result = executor_cls(**executor_kwargs) + ex_cls, kwargs = request.param + ex = ex_cls(**kwargs) try: - yield result + yield ex finally: - result.close() + ex.close() def test_node_operation_logging(ctx, executor): @@ -304,7 +295,6 @@ def test_node_operation_logging(ctx, executor): arguments=arguments ) ) - execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) _assert_loggins(ctx, arguments) @@ -418,7 +408,7 @@ def _assert_loggins(ctx, arguments): assert len(executions) == 1 execution = executions[0] - tasks = ctx.model.task.list() + tasks = ctx.model.task.list(filters={'stub_type': None}) assert len(tasks) == 1 task = tasks[0] assert len(task.logs) == 4 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index 0919e81..aa19f56 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -18,7 +18,7 @@ import pytest from aria.orchestrator.workflows import api from aria.orchestrator.workflows.core import engine from aria.orchestrator.workflows.executor import process -from aria.orchestrator import workflow, operation +from aria.orchestrator import workflow, operation, workflow_runner import tests from tests import mock from tests import storage @@ -33,6 +33,12 @@ def test_serialize_operation_context(context, executor, tmpdir): test_file.write(TEST_FILE_CONTENT) resource = context.resource resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file)) + graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter + workflow_runner.construct_execution_tasks(context.execution, graph, executor.__class__) + context.execution = context.execution + execution_graph = workflow_runner.get_execution_graph(context.execution) + eng = engine.Engine(executor, context, execution_graph) + eng.execute() node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) plugin = mock.models.create_plugin() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index f667460..99a0cb6 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -19,7 +19,7 @@ import os import pytest from aria import workflow -from aria.orchestrator import events +from aria.orchestrator import events, workflow_runner from aria.orchestrator.workflows import api from aria.orchestrator.workflows.exceptions import ExecutorException from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException @@ -500,10 +500,11 @@ if __name__ == '__main__': arguments=arguments)) return graph tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter - eng = engine.Engine( - executor=executor, - workflow_context=workflow_context, - tasks_graph=tasks_graph) + workflow_runner.construct_execution_tasks( + workflow_context.execution, tasks_graph, executor.__class__) + workflow_context.execution = workflow_context.execution + execution_graph = workflow_runner.get_execution_graph(workflow_context.execution) + eng = engine.Engine(executor, workflow_context, execution_graph) eng.execute() return workflow_context.model.node.get_by_name( mock.models.DEPENDENCY_NODE_NAME).attributes http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 0438544..8bcf01e 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -24,6 +24,7 @@ from aria.orchestrator import ( operation, ) from aria.modeling import models +from aria.orchestrator import workflow_runner from aria.orchestrator.workflows import ( api, exceptions, @@ -50,9 +51,13 @@ class BaseTest(object): @staticmethod def _engine(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) + execution = workflow_context.execution + workflow_runner.construct_execution_tasks(execution, graph, executor.__class__) + workflow_context.execution = execution + return engine.Engine(executor=executor, workflow_context=workflow_context, - tasks_graph=graph) + execution_graph=workflow_runner.get_execution_graph(execution)) @staticmethod def _create_interface(ctx, func, arguments=None): @@ -98,9 +103,10 @@ class BaseTest(object): @pytest.fixture(autouse=True) def signals_registration(self, ): - def sent_task_handler(*args, **kwargs): - calls = global_test_holder.setdefault('sent_task_signal_calls', 0) - global_test_holder['sent_task_signal_calls'] = calls + 1 + def sent_task_handler(task, *args, **kwargs): + if task.stub_type is None: + calls = global_test_holder.setdefault('sent_task_signal_calls', 0) + global_test_holder['sent_task_signal_calls'] = calls + 1 def start_workflow_handler(workflow_context, *args, **kwargs): workflow_context.states.append('start') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/tests/orchestrator/workflows/core/test_events.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py index 6d542e9..92582a9 100644 --- a/tests/orchestrator/workflows/core/test_events.py +++ b/tests/orchestrator/workflows/core/test_events.py @@ -15,6 +15,7 @@ import pytest +from aria.orchestrator import workflow_runner from tests import mock, storage from aria.modeling.service_instance import NodeBase from aria.orchestrator.decorators import operation, workflow @@ -112,13 +113,14 @@ def run_operation_on_node(ctx, op_name, interface_name): operation_name=op_name, operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func))) node.interfaces[interface.name] = interface + workflow_runner.construct_execution_tasks( + ctx.execution, single_operation_workflow( + ctx=ctx, node=node, interface_name=interface_name, op_name=op_name), ThreadExecutor) + ctx.execution = ctx.execution eng = engine.Engine(executor=ThreadExecutor(), workflow_context=ctx, - tasks_graph=single_operation_workflow(ctx=ctx, # pylint: disable=no-value-for-parameter - node=node, - interface_name=interface_name, - op_name=op_name)) + execution_graph=workflow_runner.get_execution_graph(ctx.execution)) eng.execute() return node http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index 5dd2855..aebae38 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -13,12 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from networkx import topological_sort, DiGraph - -from aria.orchestrator import context -from aria.orchestrator.workflows import api, core +from networkx import topological_sort + +from aria.modeling import models +from aria.orchestrator import ( + context, + workflow_runner +) +from aria.orchestrator.workflows import api from aria.orchestrator.workflows.executor import base - from tests import mock from tests import storage @@ -65,10 +68,12 @@ def test_task_graph_into_execution_graph(tmpdir): test_task_graph.add_dependency(simple_after_task, inner_task_graph) # Direct check - execution_graph = DiGraph() - core.translation.build_execution_graph(task_graph=test_task_graph, - execution_graph=execution_graph, - default_executor=base.StubTaskExecutor()) + execution = task_context.model.execution.list()[0] + + workflow_runner.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor) + task_context.execution = execution + + execution_graph = workflow_runner.get_execution_graph(execution) execution_tasks = topological_sort(execution_graph) assert len(execution_tasks) == 7 @@ -83,30 +88,23 @@ def test_task_graph_into_execution_graph(tmpdir): '{0}-End'.format(test_task_graph.id) ] - assert expected_tasks_names == execution_tasks - - assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), - core.task.StartWorkflowTask) - - _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph), - simple_before_task) - assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), - core.task.StartSubWorkflowTask) + assert expected_tasks_names == [t.api_id for t in execution_tasks] + assert all(isinstance(task, models.Task) for task in execution_tasks) + execution_tasks = iter(execution_tasks) - _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph), - inner_task) - assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), - core.task.EndSubWorkflowTask) + assert next(execution_tasks).stub_type == models.Task.START_WORKFLOW + _assert_execution_is_api_task(next(execution_tasks), simple_before_task) + assert next(execution_tasks).stub_type == models.Task.START_SUBWROFKLOW + _assert_execution_is_api_task(next(execution_tasks), inner_task) + assert next(execution_tasks).stub_type == models.Task.END_SUBWORKFLOW + _assert_execution_is_api_task(next(execution_tasks), simple_after_task) + assert next(execution_tasks).stub_type == models.Task.END_WORKFLOW - _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph), - simple_after_task) - assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), - core.task.EndWorkflowTask) storage.release_sqlite_storage(task_context.model) def _assert_execution_is_api_task(execution_task, api_task): - assert execution_task.id == api_task.id + assert execution_task.api_id == api_task.id assert execution_task.name == api_task.name assert execution_task.function == api_task.function assert execution_task.actor == api_task.actor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index ac6d325..b8032b7 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -12,68 +12,44 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import uuid import logging -from collections import namedtuple -from contextlib import contextmanager import aria -from aria.modeling import models - - -class MockTask(object): - - INFINITE_RETRIES = models.Task.INFINITE_RETRIES - - def __init__(self, function, arguments=None, plugin=None, storage=None): - self.function = self.name = function - self.plugin_fk = plugin.id if plugin else None - self.plugin = plugin or None - self.arguments = arguments or {} - self.states = [] - self.exception = None - self.id = str(uuid.uuid4()) - self.logger = logging.getLogger() - self.context = MockContext(storage) - self.attempts_count = 1 - self.max_attempts = 1 - self.ignore_failure = False - self.interface_name = 'interface_name' - self.operation_name = 'operation_name' - self.actor = namedtuple('actor', 'name')(name='actor_name') - self.model_task = None - - for state in models.Task.STATES: - setattr(self, state.upper(), state) - - @contextmanager - def _update(self): - yield self class MockContext(object): - def __init__(self, storage=None): + def __init__(self, storage, **kwargs): self.logger = logging.getLogger('mock_logger') - self.task = type('SubprocessMockTask', (object, ), {'plugin': None}) self.model = storage + task = storage.task.model_cls(**kwargs) + self.model.task.put(task) + self._task_id = task.id + self.states = [] + self.exception = None + + @property + def task(self): + return self.model.task.get(self._task_id) @property def serialization_dict(self): if self.model: - return {'context': self.model.serialization_dict, 'context_cls': self.__class__} + context = self.model.serialization_dict + context['task_id'] = self.task_id + return {'context': context, 'context_cls': self.__class__} else: - return {'context_cls': self.__class__, 'context': {}} + return {'context_cls': self.__class__, 'context': {'task': self.task_id}} def __getattr__(self, item): return None @classmethod - def instantiate_from_dict(cls, **kwargs): + def instantiate_from_dict(cls, task_id, **kwargs): if kwargs: - return cls(storage=aria.application_model_storage(**kwargs)) + return cls(task_id=task_id, storage=aria.application_model_storage(**kwargs)) else: - return cls() + return cls(task=task_id) @staticmethod def close(): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 3079c60..410a982 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -17,6 +17,8 @@ import pytest import retrying +from tests import mock, storage + try: import celery as _celery app = _celery.Celery() @@ -25,7 +27,6 @@ except ImportError: _celery = None app = None -import aria from aria.modeling import models from aria.orchestrator import events from aria.orchestrator.workflows.executor import ( @@ -35,41 +36,50 @@ from aria.orchestrator.workflows.executor import ( ) import tests -from . import MockTask def _get_function(func): return '{module}.{func.__name__}'.format(module=__name__, func=func) -def execute_and_assert(executor, storage=None): +def execute_and_assert(executor, ctx): + node = ctx.model.node.list()[0] + execution = ctx.model.execution.list()[0] expected_value = 'value' - successful_task = MockTask(_get_function(mock_successful_task), storage=storage) - failing_task = MockTask(_get_function(mock_failing_task), storage=storage) - task_with_inputs = MockTask(_get_function(mock_task_with_input), - arguments={'input': models.Argument.wrap('input', 'value')}, - storage=storage) - - for task in [successful_task, failing_task, task_with_inputs]: - executor.execute(task) + successful_ctx = models.Task(function=_get_function(mock_successful_task), + node=node, _executor=executor, execution=execution) + failing_ctx = models.Task( + function=_get_function(mock_failing_task), node=node, _executor=executor, execution=execution) + ctx_with_inputs = models.Task( + node=node, + function=_get_function(mock_task_with_input), + arguments={'input': models.Argument.wrap('input', 'value')}, + _executor=executor, + execution=execution) + + ctx.model.execution.update(execution) + + for op_ctx in [successful_ctx, failing_ctx, ctx_with_inputs]: + op_ctx.states = [] + op_ctx.execute(ctx) @retrying.retry(stop_max_delay=10000, wait_fixed=100) def assertion(): - assert successful_task.states == ['start', 'success'] - assert failing_task.states == ['start', 'failure'] - assert task_with_inputs.states == ['start', 'failure'] - assert isinstance(failing_task.exception, MockException) - assert isinstance(task_with_inputs.exception, MockException) - assert task_with_inputs.exception.message == expected_value + assert successful_ctx.states == ['start', 'success'] + assert failing_ctx.states == ['start', 'failure'] + assert ctx_with_inputs.states == ['start', 'failure'] + assert isinstance(failing_ctx.exception, MockException) + assert isinstance(ctx_with_inputs.exception, MockException) + assert ctx_with_inputs.exception.message == expected_value assertion() -def test_thread_execute(thread_executor): - execute_and_assert(thread_executor) +def test_thread_execute(thread_executor, ctx): + execute_and_assert(thread_executor, ctx) -def test_process_execute(process_executor, storage): - execute_and_assert(process_executor, storage) +def test_process_execute(process_executor, ctx): + execute_and_assert(process_executor, ctx) def mock_successful_task(**_): @@ -94,25 +104,16 @@ class MockException(Exception): @pytest.fixture -def storage(tmpdir): - return aria.application_model_storage( - aria.storage.sql_mapi.SQLAlchemyModelAPI, - initiator_kwargs=dict(base_dir=str(tmpdir)) - ) - - -@pytest.fixture(params=[ - (thread.ThreadExecutor, {'pool_size': 1}), - (thread.ThreadExecutor, {'pool_size': 2}), - # subprocess needs to load a tests module so we explicitly add the root directory as if - # the project has been installed in editable mode - # (celery.CeleryExecutor, {'app': app}) -]) -def thread_executor(request): - executor_cls, executor_kwargs = request.param - result = executor_cls(**executor_kwargs) - yield result - result.close() +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir)) + ctx.states = [] + yield context + storage.release_sqlite_storage(context.model) + + +@pytest.fixture +def thread_executor(): + return thread.ThreadExecutor @pytest.fixture @@ -124,15 +125,15 @@ def process_executor(): @pytest.fixture(autouse=True) def register_signals(): - def start_handler(task, *args, **kwargs): - task.states.append('start') + def start_handler(ctx, *args, **kwargs): + ctx.states.append('start') - def success_handler(task, *args, **kwargs): - task.states.append('success') + def success_handler(ctx, *args, **kwargs): + ctx.states.append('success') - def failure_handler(task, exception, *args, **kwargs): - task.states.append('failure') - task.exception = exception + def failure_handler(ctx, exception, *args, **kwargs): + ctx.states.append('failure') + ctx.exception = exception events.start_task_signal.connect(start_handler) events.on_success_task_signal.connect(success_handler) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/907ed6eb/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 058190e..bca2ea3 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -30,7 +30,6 @@ from tests.fixtures import ( # pylint: disable=unused-import plugin_manager, fs_model as model ) -from . import MockTask class TestProcessExecutor(object):