Repository: incubator-ariatosca Updated Branches: refs/heads/master 47eaf04f0 -> 8947f72cb
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/builtin/__init__.py ---------------------------------------------------------------------- diff --git a/tests/workflows/builtin/__init__.py b/tests/workflows/builtin/__init__.py new file mode 100644 index 0000000..c82c024 --- /dev/null +++ b/tests/workflows/builtin/__init__.py @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ... import mock + +def assert_node_install_operations(operations, with_relationships=False): + if with_relationships: + all_operations = [ + 'aria.interfaces.lifecycle.create', + 'aria.interfaces.relationship_lifecycle.preconfigure', + 'aria.interfaces.relationship_lifecycle.preconfigure', + 'aria.interfaces.lifecycle.configure', + 'aria.interfaces.relationship_lifecycle.postconfigure', + 'aria.interfaces.relationship_lifecycle.postconfigure', + 'aria.interfaces.lifecycle.start', + 'aria.interfaces.relationship_lifecycle.establish', + 'aria.interfaces.relationship_lifecycle.establish', + ] + + for i, operation in enumerate(operations): + assert operation.name.startswith(all_operations[i]) + else: + for i, operation in enumerate(operations): + assert operation.name.startswith(mock.operations.NODE_OPERATIONS_INSTALL[i]) + + +def assert_node_uninstall_operations(operations, with_relationships=False): + if with_relationships: + all_operations = [ + 'aria.interfaces.lifecycle.stop', + 'aria.interfaces.relationship_lifecycle.unlink', + 'aria.interfaces.relationship_lifecycle.unlink', + 'aria.interfaces.lifecycle.delete', + ] + + for i, operation in enumerate(operations): + assert operation.name.startswith(all_operations[i]) + else: + for i, operation in enumerate(operations): + assert operation.name.startswith(mock.operations.NODE_OPERATIONS_UNINSTALL[i]) + + +def ctx_with_basic_graph(): + """ + Create the following graph in storage: + dependency_node <------ dependent_node + :return: + """ + simple_context = mock.context.simple() + dependency_node = mock.models.get_dependency_node() + dependency_node_instance = mock.models.get_dependency_node_instance( + dependency_node=dependency_node) + + relationship = mock.models.get_relationship(dependency_node) + relationship_instance = mock.models.get_relationship_instance( + relationship=relationship, + target_instance=dependency_node_instance + ) + + dependent_node = mock.models.get_dependent_node(relationship) + dependent_node_instance = mock.models.get_dependent_node_instance( + dependent_node=dependent_node, + relationship_instance=relationship_instance + ) + + simple_context.model.node.store(dependent_node) + simple_context.model.node.store(dependency_node) + simple_context.model.node_instance.store(dependent_node_instance) + simple_context.model.node_instance.store(dependency_node_instance) + simple_context.model.relationship.store(relationship) + simple_context.model.relationship_instance.store(relationship_instance) + + return simple_context http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/builtin/test_execute_operation.py ---------------------------------------------------------------------- diff --git a/tests/workflows/builtin/test_execute_operation.py b/tests/workflows/builtin/test_execute_operation.py new file mode 100644 index 0000000..9409686 --- /dev/null +++ b/tests/workflows/builtin/test_execute_operation.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.workflows.api import task +from aria.workflows.builtin.execute_operation import execute_operation + +from ... import mock +from . import ctx_with_basic_graph + + [email protected] +def ctx(): + return ctx_with_basic_graph() + + +def test_execute_operation(ctx): + operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0] + node_instance_id = 'dependency_node_instance' + + execute_tasks = list( + task.WorkflowTask( + execute_operation, + ctx=ctx, + operation=operation_name, + operation_kwargs={}, + allow_kwargs_override=False, + run_by_dependency_order=False, + type_names=[], + node_ids=[], + node_instance_ids=[node_instance_id] + ).topological_order() + ) + + assert len(execute_tasks) == 1 + assert execute_tasks[0].name == '{0}.{1}'.format(node_instance_id, operation_name) + +# TODO: add more scenarios http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/builtin/test_heal.py ---------------------------------------------------------------------- diff --git a/tests/workflows/builtin/test_heal.py b/tests/workflows/builtin/test_heal.py new file mode 100644 index 0000000..edffe2c --- /dev/null +++ b/tests/workflows/builtin/test_heal.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.workflows.api import task +from aria.workflows.builtin.heal import heal + +from . import (assert_node_install_operations, + assert_node_uninstall_operations, + ctx_with_basic_graph) + + [email protected] +def ctx(): + return ctx_with_basic_graph() + + +def test_heal_dependent_node(ctx): + heal_graph = task.WorkflowTask(heal, ctx=ctx, node_instance_id='dependent_node_instance') + + assert len(list(heal_graph.tasks)) == 2 + uninstall_subgraph, install_subgraph = list(heal_graph.topological_order(reverse=True)) + + assert len(list(uninstall_subgraph.tasks)) == 2 + dependent_node_subgraph_uninstall, dependency_node_subgraph_uninstall = \ + list(uninstall_subgraph.topological_order(reverse=True)) + + assert len(list(install_subgraph.tasks)) == 2 + dependency_node_subgraph_install, dependent_node_subgraph_install = \ + list(install_subgraph.topological_order(reverse=True)) + + dependent_node_uninstall_tasks = \ + list(dependent_node_subgraph_uninstall.topological_order(reverse=True)) + assert isinstance(dependency_node_subgraph_uninstall, task.StubTask) + dependent_node_install_tasks = \ + list(dependent_node_subgraph_install.topological_order(reverse=True)) + assert isinstance(dependency_node_subgraph_install, task.StubTask) + + assert_node_uninstall_operations(dependent_node_uninstall_tasks, with_relationships=True) + assert_node_install_operations(dependent_node_install_tasks, with_relationships=True) + + +def test_heal_dependency_node(ctx): + heal_graph = task.WorkflowTask(heal, ctx=ctx, node_instance_id='dependency_node_instance') + # both subgraphs should contain un\install for both the dependent and the dependency + assert len(list(heal_graph.tasks)) == 2 + uninstall_subgraph, install_subgraph = list(heal_graph.topological_order(reverse=True)) + + uninstall_tasks = list(uninstall_subgraph.topological_order(reverse=True)) + assert len(uninstall_tasks) == 4 + unlink_source, unlink_target = uninstall_tasks[:2] + dependent_node_subgraph_uninstall, dependency_node_subgraph_uninstall = uninstall_tasks[2:] + + install_tasks = list(install_subgraph.topological_order(reverse=True)) + assert len(install_tasks) == 4 + dependency_node_subgraph_install, dependent_node_subgraph_install = install_tasks[:2] + establish_source, establish_target = install_tasks[2:] + + assert isinstance(dependent_node_subgraph_uninstall, task.StubTask) + dependency_node_uninstall_tasks = \ + list(dependency_node_subgraph_uninstall.topological_order(reverse=True)) + assert isinstance(dependent_node_subgraph_install, task.StubTask) + dependency_node_install_tasks = \ + list(dependency_node_subgraph_install.topological_order(reverse=True)) + + assert unlink_source.name.startswith('aria.interfaces.relationship_lifecycle.unlink') + assert unlink_target.name.startswith('aria.interfaces.relationship_lifecycle.unlink') + assert_node_uninstall_operations(dependency_node_uninstall_tasks) + + assert_node_install_operations(dependency_node_install_tasks) + assert establish_source.name.startswith('aria.interfaces.relationship_lifecycle.establish') + assert establish_target.name.startswith('aria.interfaces.relationship_lifecycle.establish') + + +# TODO: add tests for contained in scenario http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/builtin/test_install.py ---------------------------------------------------------------------- diff --git a/tests/workflows/builtin/test_install.py b/tests/workflows/builtin/test_install.py new file mode 100644 index 0000000..29212e3 --- /dev/null +++ b/tests/workflows/builtin/test_install.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.workflows.builtin.install import install +from aria.workflows.api import task + +from . import (assert_node_install_operations, + ctx_with_basic_graph) + + [email protected] +def ctx(): + return ctx_with_basic_graph() + + +def test_install(ctx): + install_tasks = list(task.WorkflowTask(install, ctx=ctx).topological_order(True)) + + assert len(install_tasks) == 2 + dependency_node_subgraph, dependent_node_subgraph = install_tasks + dependent_node_tasks = list(dependent_node_subgraph.topological_order(reverse=True)) + dependency_node_tasks = list(dependency_node_subgraph.topological_order(reverse=True)) + + assert_node_install_operations(dependency_node_tasks) + assert_node_install_operations(dependent_node_tasks, with_relationships=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/builtin/test_uninstall.py ---------------------------------------------------------------------- diff --git a/tests/workflows/builtin/test_uninstall.py b/tests/workflows/builtin/test_uninstall.py new file mode 100644 index 0000000..2d00673 --- /dev/null +++ b/tests/workflows/builtin/test_uninstall.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.workflows.api import task +from aria.workflows.builtin.uninstall import uninstall + +from . import (assert_node_uninstall_operations, + ctx_with_basic_graph) + + [email protected] +def ctx(): + return ctx_with_basic_graph() + + +def test_uninstall(ctx): + uninstall_tasks = list(task.WorkflowTask(uninstall, ctx=ctx).topological_order(True)) + + assert len(uninstall_tasks) == 2 + dependent_node_subgraph, dependency_node_subgraph = uninstall_tasks + dependent_node_tasks = list(dependent_node_subgraph.topological_order(reverse=True)) + dependency_node_tasks = list(dependency_node_subgraph.topological_order(reverse=True)) + + assert_node_uninstall_operations(operations=dependency_node_tasks) + assert_node_uninstall_operations(operations=dependent_node_tasks, with_relationships=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/core/__init__.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/__init__.py b/tests/workflows/core/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/tests/workflows/core/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/core/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_executor.py b/tests/workflows/core/test_executor.py new file mode 100644 index 0000000..8ec0303 --- /dev/null +++ b/tests/workflows/core/test_executor.py @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import uuid +from contextlib import contextmanager + +import pytest +import retrying + +from aria import events +from aria.storage import models +from aria.workflows.executor import ( + thread, + multiprocess, + blocking, + # celery +) + +try: + import celery as _celery + app = _celery.Celery() + app.conf.update(CELERY_RESULT_BACKEND='amqp://') +except ImportError: + _celery = None + app = None + + +class TestExecutor(object): + + @pytest.mark.parametrize('executor_cls,executor_kwargs', [ + (thread.ThreadExecutor, {'pool_size': 1}), + (thread.ThreadExecutor, {'pool_size': 2}), + (multiprocess.MultiprocessExecutor, {'pool_size': 1}), + (multiprocess.MultiprocessExecutor, {'pool_size': 2}), + (blocking.CurrentThreadBlockingExecutor, {}), + # (celery.CeleryExecutor, {'app': app}) + ]) + def test_execute(self, executor_cls, executor_kwargs): + self.executor = executor_cls(**executor_kwargs) + expected_value = 'value' + successful_task = MockTask(mock_successful_task) + failing_task = MockTask(mock_failing_task) + task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value}) + + for task in [successful_task, failing_task, task_with_inputs]: + self.executor.execute(task) + + @retrying.retry(stop_max_delay=10000, wait_fixed=100) + def assertion(): + assert successful_task.states == ['start', 'success'] + assert failing_task.states == ['start', 'failure'] + assert task_with_inputs.states == ['start', 'failure'] + assert isinstance(failing_task.exception, MockException) + assert isinstance(task_with_inputs.exception, MockException) + assert task_with_inputs.exception.message == expected_value + assertion() + + def setup_method(self): + events.start_task_signal.connect(start_handler) + events.on_success_task_signal.connect(success_handler) + events.on_failure_task_signal.connect(failure_handler) + + def teardown_method(self): + events.start_task_signal.disconnect(start_handler) + events.on_success_task_signal.disconnect(success_handler) + events.on_failure_task_signal.disconnect(failure_handler) + if hasattr(self, 'executor'): + self.executor.close() + + +def mock_successful_task(): + pass + + +def mock_failing_task(): + raise MockException + + +def mock_task_with_input(input): + raise MockException(input) + +if app: + mock_successful_task = app.task(mock_successful_task) + mock_failing_task = app.task(mock_failing_task) + mock_task_with_input = app.task(mock_task_with_input) + + +class MockException(Exception): + pass + + +class MockTask(object): + + def __init__(self, func, inputs=None): + self.states = [] + self.exception = None + self.id = str(uuid.uuid4()) + name = func.__name__ + operation = 'tests.workflows.core.test_executor.{name}'.format(name=name) + self.operation_details = {'operation': operation} + self.logger = logging.getLogger() + self.name = name + self.inputs = inputs or {} + + for state in models.Task.STATES: + setattr(self, state.upper(), state) + + @contextmanager + def update(self): + yield self + + +def start_handler(task, *args, **kwargs): + task.states.append('start') + + +def success_handler(task, *args, **kwargs): + task.states.append('success') + + +def failure_handler(task, exception, *args, **kwargs): + task.states.append('failure') + task.exception = exception http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/core/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_task_graph_into_exececution_graph.py b/tests/workflows/core/test_task_graph_into_exececution_graph.py new file mode 100644 index 0000000..75e825f --- /dev/null +++ b/tests/workflows/core/test_task_graph_into_exececution_graph.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from networkx import topological_sort, DiGraph + +from aria import context +from aria.workflows import api, core + +from ... import mock + + +def test_task_graph_into_execution_graph(): + task_context = mock.context.simple() + node = mock.models.get_dependency_node() + node_instance = mock.models.get_dependency_node_instance() + task_context.model.node.store(node) + task_context.model.node_instance.store(node_instance) + + def sub_workflow(name, **_): + return api.task_graph.TaskGraph(name) + + with context.workflow.current.push(task_context): + test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') + simple_before_task = api.task.OperationTask('test_simple_before_task', {}, node_instance) + simple_after_task = api.task.OperationTask('test_simple_after_task', {}, node_instance) + + inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph') + inner_task = api.task.OperationTask('test_inner_task', {}, node_instance) + inner_task_graph.add_tasks(inner_task) + + test_task_graph.add_tasks(simple_before_task) + test_task_graph.add_tasks(simple_after_task) + test_task_graph.add_tasks(inner_task_graph) + test_task_graph.add_dependency(inner_task_graph, simple_before_task) + test_task_graph.add_dependency(simple_after_task, inner_task_graph) + + # Direct check + execution_graph = DiGraph() + core.translation.build_execution_graph(task_graph=test_task_graph, + execution_graph=execution_graph) + execution_tasks = topological_sort(execution_graph) + + assert len(execution_tasks) == 7 + + expected_tasks_names = [ + '{0}-Start'.format(test_task_graph.id), + simple_before_task.id, + '{0}-Start'.format(inner_task_graph.id), + inner_task.id, + '{0}-End'.format(inner_task_graph.id), + simple_after_task.id, + '{0}-End'.format(test_task_graph.id) + ] + + assert expected_tasks_names == execution_tasks + + assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), + core.task.StartWorkflowTask) + + _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph), + simple_before_task) + assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), + core.task.StartSubWorkflowTask) + + _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph), + inner_task) + assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), + core.task.EndSubWorkflowTask) + + _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph), + simple_after_task) + assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), + core.task.EndWorkflowTask) + + +def _assert_execution_is_api_task(execution_task, api_task): + assert execution_task.id == api_task.id + assert execution_task.name == api_task.name + assert execution_task.operation_details == api_task.operation_details + assert execution_task.node_instance == api_task.node_instance + assert execution_task.inputs == api_task.inputs + + +def _get_task_by_name(task_name, graph): + return graph.node[task_name]['task'] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py index 03a9d19..ea703f5 100644 --- a/tests/workflows/test_engine.py +++ b/tests/workflows/test_engine.py @@ -20,11 +20,14 @@ import pytest import aria from aria import events from aria import workflow -from aria import contexts +from aria import context from aria.storage import models from aria.workflows import exceptions from aria.workflows.executor import thread from aria.workflows.core import engine +from aria.workflows import api + +from .. import mock import tests.storage @@ -47,8 +50,8 @@ class TestEngine(object): def test_single_task_successful_execution(self, workflow_context, executor): @workflow - def mock_workflow(context, graph): - graph.add_task(self._op(mock_success_task, context)) + def mock_workflow(ctx, graph): + graph.add_tasks(self._op(mock_success_task, ctx)) self._execute( workflow_func=mock_workflow, workflow_context=workflow_context, @@ -59,8 +62,8 @@ class TestEngine(object): def test_single_task_failed_execution(self, workflow_context, executor): @workflow - def mock_workflow(context, graph): - graph.add_task(self._op(mock_failed_task, context)) + def mock_workflow(ctx, graph): + graph.add_tasks(self._op(mock_failed_task, ctx)) with pytest.raises(exceptions.ExecutorException): self._execute( workflow_func=mock_workflow, @@ -72,10 +75,10 @@ class TestEngine(object): def test_two_tasks_execution_order(self, workflow_context, executor): @workflow - def mock_workflow(context, graph): - op1 = self._op(mock_ordered_task, context, inputs={'counter': 1}) - op2 = self._op(mock_ordered_task, context, inputs={'counter': 2}) - graph.chain(tasks=[op1, op2]) + def mock_workflow(ctx, graph): + op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1}) + op2 = self._op(mock_ordered_task, ctx, inputs={'counter': 2}) + graph.sequence(op1, op2) self._execute( workflow_func=mock_workflow, workflow_context=workflow_context, @@ -87,19 +90,17 @@ class TestEngine(object): @staticmethod def _execute(workflow_func, workflow_context, executor): - graph = workflow_func(context=workflow_context) - eng = engine.Engine(executor=executor, - workflow_context=workflow_context, - tasks_graph=graph) + graph = workflow_func(ctx=workflow_context) + eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph) eng.execute() @staticmethod - def _op(function, context, inputs=None): - return context.operation( + def _op(func, ctx, inputs=None): + return api.task.OperationTask( name='task', - node_instance=None, operation_details={'operation': 'tests.workflows.test_engine.{name}'.format( - name=function.__name__)}, + name=func.__name__)}, + node_instance=ctx.model.node_instance.get('dependency_node_instance'), inputs=inputs ) @@ -158,7 +159,11 @@ class TestEngine(object): updated_at=datetime.now(), workflows={}) model_storage.deployment.store(deployment) - result = contexts.WorkflowContext( + node = mock.models.get_dependency_node() + node_instance = mock.models.get_dependency_node_instance(node) + model_storage.node.store(node) + model_storage.node_instance.store(node_instance) + result = context.workflow.WorkflowContext( name='test', model_storage=model_storage, resource_storage=None, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_executor.py b/tests/workflows/test_executor.py deleted file mode 100644 index 7457fd0..0000000 --- a/tests/workflows/test_executor.py +++ /dev/null @@ -1,136 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import uuid - -import pytest -import retrying - -from aria import events -from aria.storage import models -from aria.workflows.executor import ( - thread, - multiprocess, - blocking, - # celery -) - -try: - import celery as _celery - app = _celery.Celery() - app.conf.update(CELERY_RESULT_BACKEND='amqp://') -except ImportError: - _celery = None - app = None - - -class TestExecutor(object): - - @pytest.mark.parametrize('executor_cls,executor_kwargs', [ - (thread.ThreadExecutor, {'pool_size': 1}), - (thread.ThreadExecutor, {'pool_size': 2}), - (multiprocess.MultiprocessExecutor, {'pool_size': 1}), - (multiprocess.MultiprocessExecutor, {'pool_size': 2}), - (blocking.CurrentThreadBlockingExecutor, {}), - # (celery.CeleryExecutor, {'app': app}) - ]) - def test_execute(self, executor_cls, executor_kwargs): - self.executor = executor_cls(**executor_kwargs) - expected_value = 'value' - successful_task = MockTask(mock_successful_task) - failing_task = MockTask(mock_failing_task) - task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value}) - - for task in [successful_task, failing_task, task_with_inputs]: - self.executor.execute(task) - - @retrying.retry(stop_max_delay=10000, wait_fixed=100) - def assertion(): - assert successful_task.states == ['start', 'success'] - assert failing_task.states == ['start', 'failure'] - assert task_with_inputs.states == ['start', 'failure'] - assert isinstance(failing_task.exception, MockException) - assert isinstance(task_with_inputs.exception, MockException) - assert task_with_inputs.exception.message == expected_value - assertion() - - def setup_method(self): - events.start_task_signal.connect(start_handler) - events.on_success_task_signal.connect(success_handler) - events.on_failure_task_signal.connect(failure_handler) - - def teardown_method(self): - events.start_task_signal.disconnect(start_handler) - events.on_success_task_signal.disconnect(success_handler) - events.on_failure_task_signal.disconnect(failure_handler) - if self.executor: - self.executor.close() - - -def mock_successful_task(): - pass - - -def mock_failing_task(): - raise MockException - - -def mock_task_with_input(input): - raise MockException(input) - -if app: - mock_successful_task = app.task(mock_successful_task) - mock_failing_task = app.task(mock_failing_task) - mock_task_with_input = app.task(mock_task_with_input) - - -class MockException(Exception): - pass - - -class MockContext(object): - - def __init__(self, operation_details, inputs): - self.operation_details = operation_details - self.inputs = inputs - self.operation = models.Operation(execution_id='') - - -class MockTask(object): - - def __init__(self, func, inputs=None): - self.states = [] - self.exception = None - self.id = str(uuid.uuid4()) - name = func.__name__ - operation = 'tests.workflows.test_executor.{name}'.format(name=name) - self.context = MockContext(operation_details={'operation': operation}, - inputs=inputs or {}) - self.logger = logging.getLogger() - self.name = name - - -def start_handler(task, *args, **kwargs): - task.states.append('start') - - -def success_handler(task, *args, **kwargs): - task.states.append('success') - - -def failure_handler(task, exception, *args, **kwargs): - task.states.append('failure') - task.exception = exception http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_task_graph_into_exececution_graph.py b/tests/workflows/test_task_graph_into_exececution_graph.py deleted file mode 100644 index 1bae713..0000000 --- a/tests/workflows/test_task_graph_into_exececution_graph.py +++ /dev/null @@ -1,79 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest -from networkx import topological_sort, DiGraph - -from aria import contexts -from aria.workflows.api import tasks_graph -from aria.workflows.core import tasks, translation - - [email protected](autouse=True) -def no_storage(monkeypatch): - monkeypatch.setattr(tasks.OperationTask, '_create_operation_in_storage', - value=lambda *args, **kwargs: None) - - -def test_task_graph_into_execution_graph(): - task_graph = tasks_graph.TaskGraph('test_task_graph') - simple_before_task = contexts.OperationContext('test_simple_before_task', {}, {}, None) - simple_after_task = contexts.OperationContext('test_simple_after_task', {}, {}, None) - - inner_task_graph = tasks_graph.TaskGraph('test_inner_task_graph') - inner_task = contexts.OperationContext('test_inner_task', {}, {}, None) - inner_task_graph.add_task(inner_task) - - task_graph.add_task(simple_before_task) - task_graph.add_task(simple_after_task) - task_graph.add_task(inner_task_graph) - task_graph.dependency(inner_task_graph, [simple_before_task]) - task_graph.dependency(simple_after_task, [inner_task_graph]) - - # Direct check - execution_graph = DiGraph() - translation.build_execution_graph(task_graph=task_graph, - workflow_context=None, - execution_graph=execution_graph) - execution_tasks = topological_sort(execution_graph) - - assert len(execution_tasks) == 7 - - expected_tasks_names = [ - '{0}-Start'.format(task_graph.id), - simple_before_task.id, - '{0}-Start'.format(inner_task_graph.id), - inner_task.id, - '{0}-End'.format(inner_task_graph.id), - simple_after_task.id, - '{0}-End'.format(task_graph.id) - ] - - assert expected_tasks_names == execution_tasks - - assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), - tasks.StartWorkflowTask) - assert simple_before_task == _get_task_by_name(execution_tasks[1], execution_graph).context - assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), - tasks.StartSubWorkflowTask) - assert inner_task == _get_task_by_name(execution_tasks[3], execution_graph).context - assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), tasks. - EndSubWorkflowTask) - assert simple_after_task == _get_task_by_name(execution_tasks[5], execution_graph).context - assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), tasks.EndWorkflowTask) - - -def _get_task_by_name(task_name, graph): - return graph.node[task_name]['task'] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tox.ini ---------------------------------------------------------------------- diff --git a/tox.ini b/tox.ini index e882ea8..8432c85 100644 --- a/tox.ini +++ b/tox.ini @@ -30,7 +30,7 @@ commands=pytest tests --cov-report term-missing --cov aria commands=pytest tests --cov-report term-missing --cov aria [testenv:pylint_code] -commands=pylint aria --rcfile=aria/.pylintrc +commands=pylint --rcfile=aria/.pylintrc --disable=fixme --ignore=commands.py aria [testenv:pylint_tests] -commands=pylint tests --rcfile=tests/.pylintrc +commands=pylint --rcfile=tests/.pylintrc --disable=fixme tests
