Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-283-update-readme-installation-instructions 1923a6731 -> a7519349c (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index 5dd2855..f5fb17a 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -13,12 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from networkx import topological_sort, DiGraph +from networkx import topological_sort +from aria.modeling import models from aria.orchestrator import context -from aria.orchestrator.workflows import api, core +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import compile from aria.orchestrator.workflows.executor import base - from tests import mock from tests import storage @@ -26,8 +27,8 @@ from tests import storage def test_task_graph_into_execution_graph(tmpdir): interface_name = 'Standard' operation_name = 'create' - task_context = mock.context.simple(str(tmpdir)) - node = task_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + workflow_context = mock.context.simple(str(tmpdir)) + node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) interface = mock.models.create_interface( node.service, interface_name, @@ -35,12 +36,12 @@ def test_task_graph_into_execution_graph(tmpdir): operation_kwargs=dict(function='test') ) node.interfaces[interface.name] = interface - task_context.model.node.update(node) + workflow_context.model.node.update(node) def sub_workflow(name, **_): return api.task_graph.TaskGraph(name) - with context.workflow.current.push(task_context): + with context.workflow.current.push(workflow_context): test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') simple_before_task = api.task.OperationTask( node, @@ -64,12 +65,9 @@ def test_task_graph_into_execution_graph(tmpdir): test_task_graph.add_dependency(inner_task_graph, simple_before_task) test_task_graph.add_dependency(simple_after_task, inner_task_graph) - # Direct check - execution_graph = DiGraph() - core.translation.build_execution_graph(task_graph=test_task_graph, - execution_graph=execution_graph, - default_executor=base.StubTaskExecutor()) - execution_tasks = topological_sort(execution_graph) + compile.create_execution_tasks(workflow_context, test_task_graph, base.StubTaskExecutor) + + execution_tasks = topological_sort(workflow_context._graph) assert len(execution_tasks) == 7 @@ -83,30 +81,23 @@ def test_task_graph_into_execution_graph(tmpdir): '{0}-End'.format(test_task_graph.id) ] - assert expected_tasks_names == execution_tasks - - assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), - core.task.StartWorkflowTask) - - _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph), - simple_before_task) - assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), - core.task.StartSubWorkflowTask) + assert expected_tasks_names == [t._api_id for t in execution_tasks] + assert all(isinstance(task, models.Task) for task in execution_tasks) + execution_tasks = iter(execution_tasks) - _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph), - inner_task) - assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), - core.task.EndSubWorkflowTask) + assert next(execution_tasks)._stub_type == models.Task.START_WORKFLOW + _assert_execution_is_api_task(next(execution_tasks), simple_before_task) + assert next(execution_tasks)._stub_type == models.Task.START_SUBWROFKLOW + _assert_execution_is_api_task(next(execution_tasks), inner_task) + assert next(execution_tasks)._stub_type == models.Task.END_SUBWORKFLOW + _assert_execution_is_api_task(next(execution_tasks), simple_after_task) + assert next(execution_tasks)._stub_type == models.Task.END_WORKFLOW - _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph), - simple_after_task) - assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), - core.task.EndWorkflowTask) - storage.release_sqlite_storage(task_context.model) + storage.release_sqlite_storage(workflow_context.model) def _assert_execution_is_api_task(execution_task, api_task): - assert execution_task.id == api_task.id + assert execution_task._api_id == api_task.id assert execution_task.name == api_task.name assert execution_task.function == api_task.function assert execution_task.actor == api_task.actor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index ac6d325..83584a6 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -12,69 +12,80 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import uuid import logging -from collections import namedtuple +import uuid from contextlib import contextmanager import aria from aria.modeling import models +class MockContext(object): + + def __init__(self, storage, task_kwargs=None): + self.logger = logging.getLogger('mock_logger') + self._task_kwargs = task_kwargs or {} + self._storage = storage + self.task = MockTask(storage, **task_kwargs) + self.states = [] + self.exception = None + + @property + def serialization_dict(self): + return { + 'context_cls': self.__class__, + 'context': { + 'storage_kwargs': self._storage.serialization_dict, + 'task_kwargs': self._task_kwargs + } + } + + def __getattr__(self, item): + return None + + def close(self): + pass + + @classmethod + def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None): + return cls(storage=aria.application_model_storage(**(storage_kwargs or {})), + task_kwargs=(task_kwargs or {})) + + @property + @contextmanager + def persist_changes(self): + yield + + +class MockActor(object): + def __init__(self): + self.name = 'actor_name' + + class MockTask(object): INFINITE_RETRIES = models.Task.INFINITE_RETRIES - def __init__(self, function, arguments=None, plugin=None, storage=None): + def __init__(self, model, function, arguments=None, plugin_fk=None): self.function = self.name = function - self.plugin_fk = plugin.id if plugin else None - self.plugin = plugin or None + self.plugin_fk = plugin_fk self.arguments = arguments or {} self.states = [] self.exception = None self.id = str(uuid.uuid4()) self.logger = logging.getLogger() - self.context = MockContext(storage) self.attempts_count = 1 self.max_attempts = 1 self.ignore_failure = False self.interface_name = 'interface_name' self.operation_name = 'operation_name' - self.actor = namedtuple('actor', 'name')(name='actor_name') - self.model_task = None + self.actor = MockActor() + self.node = self.actor + self.model = model for state in models.Task.STATES: setattr(self, state.upper(), state) - @contextmanager - def _update(self): - yield self - - -class MockContext(object): - - def __init__(self, storage=None): - self.logger = logging.getLogger('mock_logger') - self.task = type('SubprocessMockTask', (object, ), {'plugin': None}) - self.model = storage - @property - def serialization_dict(self): - if self.model: - return {'context': self.model.serialization_dict, 'context_cls': self.__class__} - else: - return {'context_cls': self.__class__, 'context': {}} - - def __getattr__(self, item): - return None - - @classmethod - def instantiate_from_dict(cls, **kwargs): - if kwargs: - return cls(storage=aria.application_model_storage(**kwargs)) - else: - return cls() - - @staticmethod - def close(): - pass + def plugin(self): + return self.model.plugin.get(self.plugin_fk) if self.plugin_fk else None http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 3079c60..32a68e0 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -35,7 +35,7 @@ from aria.orchestrator.workflows.executor import ( ) import tests -from . import MockTask +from . import MockContext def _get_function(func): @@ -44,11 +44,17 @@ def _get_function(func): def execute_and_assert(executor, storage=None): expected_value = 'value' - successful_task = MockTask(_get_function(mock_successful_task), storage=storage) - failing_task = MockTask(_get_function(mock_failing_task), storage=storage) - task_with_inputs = MockTask(_get_function(mock_task_with_input), - arguments={'input': models.Argument.wrap('input', 'value')}, - storage=storage) + successful_task = MockContext( + storage, task_kwargs=dict(function=_get_function(mock_successful_task)) + ) + failing_task = MockContext( + storage, task_kwargs=dict(function=_get_function(mock_failing_task)) + ) + task_with_inputs = MockContext( + storage, + task_kwargs=dict(function=_get_function(mock_task_with_input), + arguments={'input': models.Argument.wrap('input', 'value')}) + ) for task in [successful_task, failing_task, task_with_inputs]: executor.execute(task) @@ -95,10 +101,10 @@ class MockException(Exception): @pytest.fixture def storage(tmpdir): - return aria.application_model_storage( - aria.storage.sql_mapi.SQLAlchemyModelAPI, - initiator_kwargs=dict(base_dir=str(tmpdir)) - ) + _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI, + initiator_kwargs=dict(base_dir=str(tmpdir))) + yield _storage + tests.storage.release_sqlite_storage(_storage) @pytest.fixture(params=[ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 058190e..755b9be 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -28,15 +28,17 @@ import tests.resources from tests.fixtures import ( # pylint: disable=unused-import plugins_dir, plugin_manager, - fs_model as model ) -from . import MockTask +from . import MockContext class TestProcessExecutor(object): - def test_plugin_execution(self, executor, mock_plugin, storage): - task = MockTask('mock_plugin1.operation', plugin=mock_plugin, storage=storage) + def test_plugin_execution(self, executor, mock_plugin, model): + ctx = MockContext( + model, + task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id) + ) queue = Queue.Queue() @@ -46,7 +48,7 @@ class TestProcessExecutor(object): events.on_success_task_signal.connect(handler) events.on_failure_task_signal.connect(handler) try: - executor.execute(task) + executor.execute(ctx) error = queue.get(timeout=60) # tests/resources/plugins/mock-plugin1 is the plugin installed # during this tests setup. The module mock_plugin1 contains a single @@ -63,10 +65,10 @@ class TestProcessExecutor(object): events.on_success_task_signal.disconnect(handler) events.on_failure_task_signal.disconnect(handler) - def test_closed(self, executor): + def test_closed(self, executor, model): executor.close() with pytest.raises(RuntimeError) as exc_info: - executor.execute(task=MockTask(function='some.function')) + executor.execute(MockContext(model, task_kwargs=dict(function='some.function'))) assert 'closed' in exc_info.value.message @@ -85,8 +87,8 @@ def mock_plugin(plugin_manager, tmpdir): @pytest.fixture -def storage(tmpdir): - return aria.application_model_storage( - aria.storage.sql_mapi.SQLAlchemyModelAPI, - initiator_kwargs=dict(base_dir=str(tmpdir)) - ) +def model(tmpdir): + _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI, + initiator_kwargs=dict(base_dir=str(tmpdir))) + yield _storage + tests.storage.release_sqlite_storage(_storage) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index 5f0b75f..ba98c4f 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -17,7 +17,7 @@ import pytest from aria import extension from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import engine +from aria.orchestrator.workflows.core import engine, compile from aria.orchestrator.workflows.executor import process from aria.orchestrator import workflow, operation @@ -57,8 +57,9 @@ def test_decorate_extension(context, executor): graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) - eng.execute() + compile.create_execution_tasks(context, graph, executor.__class__) + eng = engine.Engine({executor.__class__: executor}) + eng.execute(context) out = get_node(context).attributes.get('out').value assert out['wrapper_arguments'] == arguments assert out['function_arguments'] == arguments http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index 7dbcc5a..2f1c325 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -18,7 +18,7 @@ import copy import pytest from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import engine +from aria.orchestrator.workflows.core import engine, compile from aria.orchestrator.workflows.executor import process from aria.orchestrator import workflow, operation from aria.orchestrator.workflows import exceptions @@ -107,8 +107,9 @@ def _run_workflow(context, executor, op_func, arguments=None): graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) - eng.execute() + compile.create_execution_tasks(context, graph, executor.__class__) + eng = engine.Engine({executor.__class__: executor}) + eng.execute(context) out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out') return out.value if out else None