Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-14-workflow-engine-tests 1efdef505 -> d08040bd6 (forced update)
ARIA-14 Implement initial engine tests Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/d08040bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/d08040bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/d08040bd Branch: refs/heads/ARIA-14-workflow-engine-tests Commit: d08040bd6ca7bd9f0b27c06ccbc5b36c459db86b Parents: c0bf347 Author: Dan Kilman <dankil...@gmail.com> Authored: Tue Nov 1 16:42:34 2016 +0200 Committer: Dan Kilman <dankil...@gmail.com> Committed: Wed Nov 2 17:49:04 2016 +0200 ---------------------------------------------------------------------- aria/contexts.py | 4 +- aria/events/__init__.py | 1 + aria/events/builtin_event_handler.py | 15 ++- aria/storage/models.py | 6 +- aria/tools/application.py | 10 +- aria/workflows/core/engine.py | 23 ++-- aria/workflows/core/tasks.py | 33 +++-- tests/workflows/test_engine.py | 202 ++++++++++++++++++++++++++++++ 8 files changed, 266 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d08040bd/aria/contexts.py ---------------------------------------------------------------------- diff --git a/aria/contexts.py b/aria/contexts.py index ae7fc66..fdd26a2 100644 --- a/aria/contexts.py +++ b/aria/contexts.py @@ -201,11 +201,11 @@ class OperationContext(LoggerMixin): """ The model operation """ - return self.storage.operation.get(self.id) + return self.model.operation.get(self.id) @operation.setter def operation(self, value): """ Store the operation in the model storage """ - self.storage.operation.store(value) + self.model.operation.store(value) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d08040bd/aria/events/__init__.py ---------------------------------------------------------------------- diff --git a/aria/events/__init__.py b/aria/events/__init__.py index 6b07213..74f3e22 100644 --- a/aria/events/__init__.py +++ b/aria/events/__init__.py @@ -39,6 +39,7 @@ from blinker import signal from ..tools.plugin import plugin_installer # workflow engine task signals: +sent_task_signal = signal('sent_task_signal') start_task_signal = signal('start_task_signal') on_success_task_signal = signal('success_task_signal') on_failure_task_signal = signal('failure_task_signal') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d08040bd/aria/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py index ec3238f..2dfbd00 100644 --- a/aria/events/builtin_event_handler.py +++ b/aria/events/builtin_event_handler.py @@ -27,12 +27,21 @@ from . import ( start_workflow_signal, on_success_workflow_signal, on_failure_workflow_signal, + sent_task_signal, start_task_signal, on_success_task_signal, on_failure_task_signal, ) +@sent_task_signal.connect +def _task_sent(task, *args, **kwargs): + operation_context = task.context + operation = operation_context.operation + operation.status = operation.SENT + operation_context.operation = operation + + @start_task_signal.connect def _task_started(task, *args, **kwargs): operation_context = task.context @@ -62,7 +71,7 @@ def _task_succeeded(task, *args, **kwargs): @start_workflow_signal.connect def _workflow_started(workflow_context, *args, **kwargs): - execution_cls = workflow_context.storage.execution.model_cls + execution_cls = workflow_context.model.execution.model_cls execution = execution_cls( id=workflow_context.execution_id, deployment_id=workflow_context.deployment_id, @@ -80,7 +89,7 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs): execution = workflow_context.execution execution.error = str(exception) execution.status = execution.FAILED - execution.ended_at = datetime.utcnow(), + execution.ended_at = datetime.utcnow() workflow_context.execution = execution @@ -88,5 +97,5 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs): def _workflow_succeeded(workflow_context, *args, **kwargs): execution = workflow_context.execution execution.status = execution.TERMINATED - execution.ended_at = datetime.utcnow(), + execution.ended_at = datetime.utcnow() workflow_context.execution = execution http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d08040bd/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index 32403ed..e5fc1ac 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -221,11 +221,13 @@ class Operation(Model): A Model which represents an operation """ PENDING = 'pending' + SENT = 'sent' STARTED = 'started' SUCCESS = 'success' FAILED = 'failed' STATES = ( PENDING, + SENT, STARTED, SUCCESS, FAILED, @@ -233,9 +235,9 @@ class Operation(Model): END_STATES = [SUCCESS, FAILED] id = Field(type=basestring, default=uuid_generator) - status = Field(type=basestring, choices=STATES, default=STARTED) + status = Field(type=basestring, choices=STATES, default=PENDING) execution_id = Field(type=basestring) - eta = Field(type=datetime, default=0) + eta = Field(type=datetime, default=datetime.now) started_at = Field(type=datetime, default=None) ended_at = Field(type=datetime, default=None) max_retries = Field(type=int, default=0) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d08040bd/aria/tools/application.py ---------------------------------------------------------------------- diff --git a/aria/tools/application.py b/aria/tools/application.py index 32feeff..ddc1317 100644 --- a/aria/tools/application.py +++ b/aria/tools/application.py @@ -85,11 +85,11 @@ class StorageManager(LoggerMixin): Create a StorageManager from a blueprint """ return cls( - model_storage, - resource_storage, - blueprint_path, - blueprint_plan, - blueprint_id, + model_storage=model_storage, + resource_storage=resource_storage, + blueprint_path=blueprint_path, + blueprint_plan=blueprint_plan, + blueprint_id=blueprint_id, deployment_id=None, deployment_plan=None) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d08040bd/aria/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py index 9c6eff8..5cd4604 100644 --- a/aria/workflows/core/engine.py +++ b/aria/workflows/core/engine.py @@ -18,12 +18,15 @@ The workflow engine. Executes workflows """ import time +from datetime import datetime import networkx from aria import events, logger +from aria.storage import models from . import translation +from . import tasks class Engine(logger.LoggerMixin): @@ -34,10 +37,9 @@ class Engine(logger.LoggerMixin): def __init__(self, executor, workflow_context, tasks_graph, **kwargs): super(Engine, self).__init__(**kwargs) self._workflow_context = workflow_context - self._tasks_graph = tasks_graph self._execution_graph = networkx.DiGraph() self._executor = executor - translation.build_execution_graph(task_graph=self._tasks_graph, + translation.build_execution_graph(task_graph=tasks_graph, workflow_context=workflow_context, execution_graph=self._execution_graph) @@ -62,17 +64,18 @@ class Engine(logger.LoggerMixin): raise def _executable_tasks(self): - now = time.time() + now = datetime.now() return (task for task in self._tasks_iter() - if task.status == task.PENDING and + if task.status == models.Operation.PENDING and task.eta <= now and not self._task_has_dependencies(task)) def _ended_tasks(self): - return (task for task in self._tasks_iter() if task.status in task.END_STATES) + return (task for task in self._tasks_iter() + if task.status in models.Operation.END_STATES) def _task_has_dependencies(self, task): - return len(self._execution_graph.succ.get(task.id, {})) > 0 + return len(self._execution_graph.pred.get(task.id, {})) > 0 def _all_tasks_consumed(self): return len(self._execution_graph.node) == 0 @@ -81,10 +84,14 @@ class Engine(logger.LoggerMixin): return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True)) def _handle_executable_task(self, task): - self._executor.execute(task) + if isinstance(task, tasks.BaseWorkflowTask): + task.status = models.Operation.SUCCESS + else: + events.sent_task_signal.send(task) + self._executor.execute(task) def _handle_ended_tasks(self, task): - if task.status == task.FAILED: + if task.status == models.Operation.FAILED: raise RuntimeError('Workflow failed') else: self._execution_graph.remove_node(task.id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d08040bd/aria/workflows/core/tasks.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/tasks.py b/aria/workflows/core/tasks.py index 76ae609..98d7c13 100644 --- a/aria/workflows/core/tasks.py +++ b/aria/workflows/core/tasks.py @@ -17,13 +17,19 @@ Workflow tasks """ +from datetime import datetime -class BaseTask(object): +from aria import logger +from aria.storage import models + + +class BaseTask(logger.LoggerMixin): """ Base class for Task objects """ - def __init__(self, id, name, context): + def __init__(self, id, name, context, *args, **kwargs): + super(BaseTask, self).__init__(*args, **kwargs) self._id = id self._name = name self._context = context @@ -50,28 +56,39 @@ class BaseTask(object): return self._context -class StartWorkflowTask(BaseTask): +class BaseWorkflowTask(BaseTask): + """ + Base class for all workflow wrapping tasks + """ + + def __init__(self, *args, **kwargs): + super(BaseWorkflowTask, self).__init__(*args, **kwargs) + self.status = models.Operation.PENDING + self.eta = datetime.now() + + +class StartWorkflowTask(BaseWorkflowTask): """ Tasks marking a workflow start """ pass -class EndWorkflowTask(BaseTask): +class EndWorkflowTask(BaseWorkflowTask): """ Tasks marking a workflow end """ pass -class StartSubWorkflowTask(BaseTask): +class StartSubWorkflowTask(BaseWorkflowTask): """ Tasks marking a subworkflow start """ pass -class EndSubWorkflowTask(BaseTask): +class EndSubWorkflowTask(BaseWorkflowTask): """ Tasks marking a subworkflow end """ @@ -88,7 +105,7 @@ class OperationTask(BaseTask): self._create_operation_in_storage() def _create_operation_in_storage(self): - operation_cls = self.context.storage.operation.model_cls + operation_cls = self.context.model.operation.model_cls operation = operation_cls( id=self.context.id, execution_id=self.context.execution_id, @@ -99,6 +116,6 @@ class OperationTask(BaseTask): def __getattr__(self, attr): try: - return getattr(self.context, attr) + return getattr(self.context.operation, attr) except AttributeError: return super(OperationTask, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d08040bd/tests/workflows/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py new file mode 100644 index 0000000..2488264 --- /dev/null +++ b/tests/workflows/test_engine.py @@ -0,0 +1,202 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import pytest + +import aria +from aria import events +from aria import workflow +from aria import contexts +from aria.tools import application +from aria.storage import drivers +from aria.workflows.executor import thread +from aria.workflows.core import engine + +logging.basicConfig() + + +global_test_holder = {} + + +class TestEngine(object): + + def test_empty_graph_execution(self, workflow_context, executor): + @workflow + def mock_workflow(context, graph): + pass + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + + def test_single_task_successful_execution(self, workflow_context, executor): + @workflow + def mock_workflow(context, graph): + graph.add_task(_op(mock_success_task, context)) + execution_tasks = self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert any(hasattr(t, 'sent') and t.sent is True for t in execution_tasks) + + def test_single_task_failed_execution(self, workflow_context, executor): + @workflow + def mock_workflow(context, graph): + graph.add_task(_op(mock_failed_task, context)) + with pytest.raises(RuntimeError): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, RuntimeError) + + def test_two_tasks_execution_order(self, workflow_context, executor): + @workflow + def mock_workflow(context, graph): + op1 = _op(mock_ordered_task, context, inputs={'counter': 1}) + op2 = _op(mock_ordered_task, context, inputs={'counter': 2}) + graph.chain(tasks=[op1, op2]) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert global_test_holder.get('invocations') == [1, 2] + + @staticmethod + def _execute(workflow_func, workflow_context, executor): + graph = workflow_func(context=workflow_context) + eng = engine.Engine(executor=executor, + workflow_context=workflow_context, + tasks_graph=graph) + execution_tasks = [data['task'] for _, data in eng._execution_graph.nodes_iter(data=True)] + eng.execute() + return execution_tasks + + @pytest.fixture(scope='function', autouse=True) + def globals_cleanup(self): + try: + yield + finally: + global_test_holder.clear() + + @pytest.fixture(scope='function', autouse=True) + def signals_registration(self, ): + def sent_task_handler(task, *args, **kwargs): + task.sent = True + + def start_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('start') + + def success_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('success') + + def failure_workflow_handler(workflow_context, exception, *args, **kwargs): + workflow_context.states.append('failure') + workflow_context.exception = exception + + events.start_workflow_signal.connect(start_workflow_handler) + events.on_success_workflow_signal.connect(success_workflow_handler) + events.on_failure_workflow_signal.connect(failure_workflow_handler) + events.sent_task_signal.connect(sent_task_handler) + try: + yield + finally: + events.start_workflow_signal.disconnect(start_workflow_handler) + events.on_success_workflow_signal.disconnect(success_workflow_handler) + events.on_failure_workflow_signal.disconnect(failure_workflow_handler) + events.sent_task_signal.disconnect(sent_task_handler) + + @pytest.fixture(scope='function') + def executor(self): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + @pytest.fixture(scope='function') + def workflow_context(self, tmpdir): + from dsl_parser.parser import parse_from_path + from dsl_parser.tasks import prepare_deployment_plan + blueprint = 'tosca_definitions_version: cloudify_dsl_1_3\nnode_templates: {}' + blueprint_dir = tmpdir.mkdir('blueprint') + blueprint_path = blueprint_dir.join('blueprint.yaml') + blueprint_path.write(blueprint) + blueprint_plan = parse_from_path(str(blueprint_path)) + blueprint_id = 'b1' + deployment_plan = prepare_deployment_plan(blueprint_plan.copy()) + deployment_id = 'd1' + work_dir = tmpdir.mkdir('work') + storage_dir = work_dir.mkdir('storage') + model_storage_dir = storage_dir.mkdir('model') + resource_storage_dir = storage_dir.mkdir('resource') + model_storage = aria.application_model_storage( + drivers.FileSystemModelDriver(str(model_storage_dir))) + resource_storage = aria.application_resource_storage( + drivers.FileSystemResourceDriver(str(resource_storage_dir))) + resource_storage.setup() + model_storage.setup() + storage_manager = application.StorageManager( + model_storage=model_storage, + resource_storage=resource_storage, + blueprint_path=blueprint_path, + blueprint_id=blueprint_id, + blueprint_plan=blueprint_plan, + deployment_id=deployment_id, + deployment_plan=deployment_plan + ) + storage_manager.create_blueprint_storage( + source=str(blueprint_path), + main_file_name='blueprint.yaml') + storage_manager.create_nodes_storage() + storage_manager.create_deployment_storage() + storage_manager.create_node_instances_storage() + result = contexts.WorkflowContext( + name='test', + model_storage=model_storage, + resource_storage=resource_storage, + deployment_id=deployment_id, + workflow_id='name') + result.states = [] + result.exception = None + return result + + +def _op(function, context, inputs=None): + return context.operation( + name='task', + node_instance=None, + operation_details={'operation': 'tests.workflows.test_engine.{name}'.format( + name=function.__name__)}, + inputs=inputs + ) + + +def mock_success_task(): + pass + + +def mock_failed_task(): + raise RuntimeError + + +def mock_ordered_task(counter): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(counter)