Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-14-workflow-engine-tests f87d50aa2 -> 3fc8ee4f3 (forced update)
ARIA-14 Implement initial engine tests Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/3fc8ee4f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/3fc8ee4f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/3fc8ee4f Branch: refs/heads/ARIA-14-workflow-engine-tests Commit: 3fc8ee4f36fef3954bcc1e89c4d97572061b41c5 Parents: c0bf347 Author: Dan Kilman <dankil...@gmail.com> Authored: Tue Nov 1 16:42:34 2016 +0200 Committer: Dan Kilman <dankil...@gmail.com> Committed: Thu Nov 3 12:02:29 2016 +0200 ---------------------------------------------------------------------- aria/contexts.py | 4 +- aria/events/__init__.py | 1 + aria/events/builtin_event_handler.py | 15 ++- aria/storage/models.py | 6 +- aria/tools/application.py | 10 +- aria/workflows/core/engine.py | 26 +++-- aria/workflows/core/tasks.py | 33 ++++-- tests/.pylintrc | 2 +- tests/workflows/test_engine.py | 182 ++++++++++++++++++++++++++++++ 9 files changed, 249 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3fc8ee4f/aria/contexts.py ---------------------------------------------------------------------- diff --git a/aria/contexts.py b/aria/contexts.py index ae7fc66..fdd26a2 100644 --- a/aria/contexts.py +++ b/aria/contexts.py @@ -201,11 +201,11 @@ class OperationContext(LoggerMixin): """ The model operation """ - return self.storage.operation.get(self.id) + return self.model.operation.get(self.id) @operation.setter def operation(self, value): """ Store the operation in the model storage """ - self.storage.operation.store(value) + self.model.operation.store(value) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3fc8ee4f/aria/events/__init__.py ---------------------------------------------------------------------- diff --git a/aria/events/__init__.py b/aria/events/__init__.py index 6b07213..74f3e22 100644 --- a/aria/events/__init__.py +++ b/aria/events/__init__.py @@ -39,6 +39,7 @@ from blinker import signal from ..tools.plugin import plugin_installer # workflow engine task signals: +sent_task_signal = signal('sent_task_signal') start_task_signal = signal('start_task_signal') on_success_task_signal = signal('success_task_signal') on_failure_task_signal = signal('failure_task_signal') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3fc8ee4f/aria/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py index ec3238f..2dfbd00 100644 --- a/aria/events/builtin_event_handler.py +++ b/aria/events/builtin_event_handler.py @@ -27,12 +27,21 @@ from . import ( start_workflow_signal, on_success_workflow_signal, on_failure_workflow_signal, + sent_task_signal, start_task_signal, on_success_task_signal, on_failure_task_signal, ) +@sent_task_signal.connect +def _task_sent(task, *args, **kwargs): + operation_context = task.context + operation = operation_context.operation + operation.status = operation.SENT + operation_context.operation = operation + + @start_task_signal.connect def _task_started(task, *args, **kwargs): operation_context = task.context @@ -62,7 +71,7 @@ def _task_succeeded(task, *args, **kwargs): @start_workflow_signal.connect def _workflow_started(workflow_context, *args, **kwargs): - execution_cls = workflow_context.storage.execution.model_cls + execution_cls = workflow_context.model.execution.model_cls execution = execution_cls( id=workflow_context.execution_id, deployment_id=workflow_context.deployment_id, @@ -80,7 +89,7 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs): execution = workflow_context.execution execution.error = str(exception) execution.status = execution.FAILED - execution.ended_at = datetime.utcnow(), + execution.ended_at = datetime.utcnow() workflow_context.execution = execution @@ -88,5 +97,5 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs): def _workflow_succeeded(workflow_context, *args, **kwargs): execution = workflow_context.execution execution.status = execution.TERMINATED - execution.ended_at = datetime.utcnow(), + execution.ended_at = datetime.utcnow() workflow_context.execution = execution http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3fc8ee4f/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index 32403ed..e5fc1ac 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -221,11 +221,13 @@ class Operation(Model): A Model which represents an operation """ PENDING = 'pending' + SENT = 'sent' STARTED = 'started' SUCCESS = 'success' FAILED = 'failed' STATES = ( PENDING, + SENT, STARTED, SUCCESS, FAILED, @@ -233,9 +235,9 @@ class Operation(Model): END_STATES = [SUCCESS, FAILED] id = Field(type=basestring, default=uuid_generator) - status = Field(type=basestring, choices=STATES, default=STARTED) + status = Field(type=basestring, choices=STATES, default=PENDING) execution_id = Field(type=basestring) - eta = Field(type=datetime, default=0) + eta = Field(type=datetime, default=datetime.now) started_at = Field(type=datetime, default=None) ended_at = Field(type=datetime, default=None) max_retries = Field(type=int, default=0) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3fc8ee4f/aria/tools/application.py ---------------------------------------------------------------------- diff --git a/aria/tools/application.py b/aria/tools/application.py index 32feeff..ddc1317 100644 --- a/aria/tools/application.py +++ b/aria/tools/application.py @@ -85,11 +85,11 @@ class StorageManager(LoggerMixin): Create a StorageManager from a blueprint """ return cls( - model_storage, - resource_storage, - blueprint_path, - blueprint_plan, - blueprint_id, + model_storage=model_storage, + resource_storage=resource_storage, + blueprint_path=blueprint_path, + blueprint_plan=blueprint_plan, + blueprint_id=blueprint_id, deployment_id=None, deployment_plan=None) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3fc8ee4f/aria/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py index 9c6eff8..a00bc84 100644 --- a/aria/workflows/core/engine.py +++ b/aria/workflows/core/engine.py @@ -18,12 +18,16 @@ The workflow engine. Executes workflows """ import time +from datetime import datetime import networkx from aria import events, logger +from aria.storage import models +from .. import exceptions from . import translation +from . import tasks class Engine(logger.LoggerMixin): @@ -34,10 +38,9 @@ class Engine(logger.LoggerMixin): def __init__(self, executor, workflow_context, tasks_graph, **kwargs): super(Engine, self).__init__(**kwargs) self._workflow_context = workflow_context - self._tasks_graph = tasks_graph self._execution_graph = networkx.DiGraph() self._executor = executor - translation.build_execution_graph(task_graph=self._tasks_graph, + translation.build_execution_graph(task_graph=tasks_graph, workflow_context=workflow_context, execution_graph=self._execution_graph) @@ -62,17 +65,18 @@ class Engine(logger.LoggerMixin): raise def _executable_tasks(self): - now = time.time() + now = datetime.now() return (task for task in self._tasks_iter() - if task.status == task.PENDING and + if task.status == models.Operation.PENDING and task.eta <= now and not self._task_has_dependencies(task)) def _ended_tasks(self): - return (task for task in self._tasks_iter() if task.status in task.END_STATES) + return (task for task in self._tasks_iter() + if task.status in models.Operation.END_STATES) def _task_has_dependencies(self, task): - return len(self._execution_graph.succ.get(task.id, {})) > 0 + return len(self._execution_graph.pred.get(task.id, {})) > 0 def _all_tasks_consumed(self): return len(self._execution_graph.node) == 0 @@ -81,10 +85,14 @@ class Engine(logger.LoggerMixin): return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True)) def _handle_executable_task(self, task): - self._executor.execute(task) + if isinstance(task, tasks.BaseWorkflowTask): + task.status = models.Operation.SUCCESS + else: + events.sent_task_signal.send(task) + self._executor.execute(task) def _handle_ended_tasks(self, task): - if task.status == task.FAILED: - raise RuntimeError('Workflow failed') + if task.status == models.Operation.FAILED: + raise exceptions.ExecutorException('Workflow failed') else: self._execution_graph.remove_node(task.id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3fc8ee4f/aria/workflows/core/tasks.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/tasks.py b/aria/workflows/core/tasks.py index 76ae609..98d7c13 100644 --- a/aria/workflows/core/tasks.py +++ b/aria/workflows/core/tasks.py @@ -17,13 +17,19 @@ Workflow tasks """ +from datetime import datetime -class BaseTask(object): +from aria import logger +from aria.storage import models + + +class BaseTask(logger.LoggerMixin): """ Base class for Task objects """ - def __init__(self, id, name, context): + def __init__(self, id, name, context, *args, **kwargs): + super(BaseTask, self).__init__(*args, **kwargs) self._id = id self._name = name self._context = context @@ -50,28 +56,39 @@ class BaseTask(object): return self._context -class StartWorkflowTask(BaseTask): +class BaseWorkflowTask(BaseTask): + """ + Base class for all workflow wrapping tasks + """ + + def __init__(self, *args, **kwargs): + super(BaseWorkflowTask, self).__init__(*args, **kwargs) + self.status = models.Operation.PENDING + self.eta = datetime.now() + + +class StartWorkflowTask(BaseWorkflowTask): """ Tasks marking a workflow start """ pass -class EndWorkflowTask(BaseTask): +class EndWorkflowTask(BaseWorkflowTask): """ Tasks marking a workflow end """ pass -class StartSubWorkflowTask(BaseTask): +class StartSubWorkflowTask(BaseWorkflowTask): """ Tasks marking a subworkflow start """ pass -class EndSubWorkflowTask(BaseTask): +class EndSubWorkflowTask(BaseWorkflowTask): """ Tasks marking a subworkflow end """ @@ -88,7 +105,7 @@ class OperationTask(BaseTask): self._create_operation_in_storage() def _create_operation_in_storage(self): - operation_cls = self.context.storage.operation.model_cls + operation_cls = self.context.model.operation.model_cls operation = operation_cls( id=self.context.id, execution_id=self.context.execution_id, @@ -99,6 +116,6 @@ class OperationTask(BaseTask): def __getattr__(self, attr): try: - return getattr(self.context, attr) + return getattr(self.context.operation, attr) except AttributeError: return super(OperationTask, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3fc8ee4f/tests/.pylintrc ---------------------------------------------------------------------- diff --git a/tests/.pylintrc b/tests/.pylintrc index f6cfd7a..0f84473 100644 --- a/tests/.pylintrc +++ b/tests/.pylintrc @@ -62,7 +62,7 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" -disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,no-self-use,missing-docstring,attribute-defined-outside-init +disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,no-self-use,missing-docstring,attribute-defined-outside-init,too-many-locals [REPORTS] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3fc8ee4f/tests/workflows/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py new file mode 100644 index 0000000..03a9d19 --- /dev/null +++ b/tests/workflows/test_engine.py @@ -0,0 +1,182 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +import pytest + +import aria +from aria import events +from aria import workflow +from aria import contexts +from aria.storage import models +from aria.workflows import exceptions +from aria.workflows.executor import thread +from aria.workflows.core import engine + +import tests.storage + + +global_test_holder = {} + + +class TestEngine(object): + + def test_empty_graph_execution(self, workflow_context, executor): + @workflow + def mock_workflow(**_): + pass + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert 'sent_task_signal_calls' not in global_test_holder + + def test_single_task_successful_execution(self, workflow_context, executor): + @workflow + def mock_workflow(context, graph): + graph.add_task(self._op(mock_success_task, context)) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('sent_task_signal_calls') == 1 + + def test_single_task_failed_execution(self, workflow_context, executor): + @workflow + def mock_workflow(context, graph): + graph.add_task(self._op(mock_failed_task, context)) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + assert global_test_holder.get('sent_task_signal_calls') == 1 + + def test_two_tasks_execution_order(self, workflow_context, executor): + @workflow + def mock_workflow(context, graph): + op1 = self._op(mock_ordered_task, context, inputs={'counter': 1}) + op2 = self._op(mock_ordered_task, context, inputs={'counter': 2}) + graph.chain(tasks=[op1, op2]) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('invocations') == [1, 2] + assert global_test_holder.get('sent_task_signal_calls') == 2 + + @staticmethod + def _execute(workflow_func, workflow_context, executor): + graph = workflow_func(context=workflow_context) + eng = engine.Engine(executor=executor, + workflow_context=workflow_context, + tasks_graph=graph) + eng.execute() + + @staticmethod + def _op(function, context, inputs=None): + return context.operation( + name='task', + node_instance=None, + operation_details={'operation': 'tests.workflows.test_engine.{name}'.format( + name=function.__name__)}, + inputs=inputs + ) + + @pytest.fixture(scope='function', autouse=True) + def globals_cleanup(self): + try: + yield + finally: + global_test_holder.clear() + + @pytest.fixture(scope='function', autouse=True) + def signals_registration(self, ): + def sent_task_handler(*args, **kwargs): + calls = global_test_holder.setdefault('sent_task_signal_calls', 0) + global_test_holder['sent_task_signal_calls'] = calls + 1 + + def start_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('start') + + def success_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('success') + + def failure_workflow_handler(workflow_context, exception, *args, **kwargs): + workflow_context.states.append('failure') + workflow_context.exception = exception + + events.start_workflow_signal.connect(start_workflow_handler) + events.on_success_workflow_signal.connect(success_workflow_handler) + events.on_failure_workflow_signal.connect(failure_workflow_handler) + events.sent_task_signal.connect(sent_task_handler) + try: + yield + finally: + events.start_workflow_signal.disconnect(start_workflow_handler) + events.on_success_workflow_signal.disconnect(success_workflow_handler) + events.on_failure_workflow_signal.disconnect(failure_workflow_handler) + events.sent_task_signal.disconnect(sent_task_handler) + + @pytest.fixture(scope='function') + def executor(self): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + @pytest.fixture(scope='function') + def workflow_context(self): + model_storage = aria.application_model_storage(tests.storage.InMemoryModelDriver()) + model_storage.setup() + deployment = models.Deployment( + id='d1', + blueprint_id='b1', + description=None, + created_at=datetime.now(), + updated_at=datetime.now(), + workflows={}) + model_storage.deployment.store(deployment) + result = contexts.WorkflowContext( + name='test', + model_storage=model_storage, + resource_storage=None, + deployment_id=deployment.id, + workflow_id='name') + result.states = [] + result.exception = None + return result + + +def mock_success_task(): + pass + + +def mock_failed_task(): + raise RuntimeError + + +def mock_ordered_task(counter): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(counter)