Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-278-Remove-core-tasks f29148af4 -> d517b820e (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d517b820/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index 5dd2855..de40fcf 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -13,12 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from networkx import topological_sort, DiGraph - -from aria.orchestrator import context -from aria.orchestrator.workflows import api, core +from networkx import topological_sort + +from aria.modeling import models +from aria.orchestrator import ( + context, + workflow_runner +) +from aria.orchestrator.workflows import api from aria.orchestrator.workflows.executor import base - from tests import mock from tests import storage @@ -26,8 +29,8 @@ from tests import storage def test_task_graph_into_execution_graph(tmpdir): interface_name = 'Standard' operation_name = 'create' - task_context = mock.context.simple(str(tmpdir)) - node = task_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + workflow_context = mock.context.simple(str(tmpdir)) + node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) interface = mock.models.create_interface( node.service, interface_name, @@ -35,12 +38,12 @@ def test_task_graph_into_execution_graph(tmpdir): operation_kwargs=dict(function='test') ) node.interfaces[interface.name] = interface - task_context.model.node.update(node) + workflow_context.model.node.update(node) def sub_workflow(name, **_): return api.task_graph.TaskGraph(name) - with context.workflow.current.push(task_context): + with context.workflow.current.push(workflow_context): test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') simple_before_task = api.task.OperationTask( node, @@ -65,11 +68,12 @@ def test_task_graph_into_execution_graph(tmpdir): test_task_graph.add_dependency(simple_after_task, inner_task_graph) # Direct check - execution_graph = DiGraph() - core.translation.build_execution_graph(task_graph=test_task_graph, - execution_graph=execution_graph, - default_executor=base.StubTaskExecutor()) - execution_tasks = topological_sort(execution_graph) + execution = workflow_context.model.execution.list()[0] + + workflow_runner.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor) + workflow_context.execution = execution + + execution_tasks = topological_sort(workflow_context._graph) assert len(execution_tasks) == 7 @@ -83,30 +87,23 @@ def test_task_graph_into_execution_graph(tmpdir): '{0}-End'.format(test_task_graph.id) ] - assert expected_tasks_names == execution_tasks - - assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), - core.task.StartWorkflowTask) - - _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph), - simple_before_task) - assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), - core.task.StartSubWorkflowTask) + assert expected_tasks_names == [t.api_id for t in execution_tasks] + assert all(isinstance(task, models.Task) for task in execution_tasks) + execution_tasks = iter(execution_tasks) - _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph), - inner_task) - assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), - core.task.EndSubWorkflowTask) + assert next(execution_tasks).stub_type == models.Task.START_WORKFLOW + _assert_execution_is_api_task(next(execution_tasks), simple_before_task) + assert next(execution_tasks).stub_type == models.Task.START_SUBWROFKLOW + _assert_execution_is_api_task(next(execution_tasks), inner_task) + assert next(execution_tasks).stub_type == models.Task.END_SUBWORKFLOW + _assert_execution_is_api_task(next(execution_tasks), simple_after_task) + assert next(execution_tasks).stub_type == models.Task.END_WORKFLOW - _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph), - simple_after_task) - assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), - core.task.EndWorkflowTask) - storage.release_sqlite_storage(task_context.model) + storage.release_sqlite_storage(workflow_context.model) def _assert_execution_is_api_task(execution_task, api_task): - assert execution_task.id == api_task.id + assert execution_task.api_id == api_task.id assert execution_task.name == api_task.name assert execution_task.function == api_task.function assert execution_task.actor == api_task.actor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d517b820/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index ac6d325..4bc5c54 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -12,69 +12,62 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import uuid import logging -from collections import namedtuple -from contextlib import contextmanager import aria -from aria.modeling import models - - -class MockTask(object): - - INFINITE_RETRIES = models.Task.INFINITE_RETRIES - - def __init__(self, function, arguments=None, plugin=None, storage=None): - self.function = self.name = function - self.plugin_fk = plugin.id if plugin else None - self.plugin = plugin or None - self.arguments = arguments or {} - self.states = [] - self.exception = None - self.id = str(uuid.uuid4()) - self.logger = logging.getLogger() - self.context = MockContext(storage) - self.attempts_count = 1 - self.max_attempts = 1 - self.ignore_failure = False - self.interface_name = 'interface_name' - self.operation_name = 'operation_name' - self.actor = namedtuple('actor', 'name')(name='actor_name') - self.model_task = None - - for state in models.Task.STATES: - setattr(self, state.upper(), state) - - @contextmanager - def _update(self): - yield self +from aria.orchestrator.context.operation import NodeOperationContext class MockContext(object): - def __init__(self, storage=None): + def __init__(self, storage, **kwargs): self.logger = logging.getLogger('mock_logger') - self.task = type('SubprocessMockTask', (object, ), {'plugin': None}) self.model = storage + task = storage.task.model_cls(**kwargs) + self.model.task.put(task) + self._task_id = task.id + self.states = [] + self.exception = None + + @property + def task(self): + return self.model.task.get(self._task_id) @property def serialization_dict(self): if self.model: - return {'context': self.model.serialization_dict, 'context_cls': self.__class__} + context = self.model.serialization_dict + context['task_id'] = self.task_id + return {'context': context, 'context_cls': self.__class__} else: - return {'context_cls': self.__class__, 'context': {}} + return {'context_cls': self.__class__, 'context': {'task': self.task_id}} def __getattr__(self, item): return None @classmethod - def instantiate_from_dict(cls, **kwargs): + def instantiate_from_dict(cls, task_id, **kwargs): if kwargs: - return cls(storage=aria.application_model_storage(**kwargs)) + return cls(task_id=task_id, storage=aria.application_model_storage(**kwargs)) else: - return cls() + return cls(task=task_id, storage=None) @staticmethod def close(): pass + + +def put_to_storage_and_get_ctx(ctx, task): + ctx.model.task.put(task) + op_ctx = NodeOperationContext( + model_storage=ctx.model, + resource_storage=ctx.resource, + workdir=ctx._workdir, + task_id=task.id, + actor_id=task.actor.id if task.actor else None, + service_id=task.execution.service.id, + execution_id=task.execution.id, + name=task.name + ) + op_ctx.states = [] + return op_ctx http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d517b820/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 3079c60..9d97a26 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -17,6 +17,8 @@ import pytest import retrying + + try: import celery as _celery app = _celery.Celery() @@ -25,7 +27,6 @@ except ImportError: _celery = None app = None -import aria from aria.modeling import models from aria.orchestrator import events from aria.orchestrator.workflows.executor import ( @@ -35,41 +36,59 @@ from aria.orchestrator.workflows.executor import ( ) import tests -from . import MockTask +from tests import mock, storage +from . import put_to_storage_and_get_ctx def _get_function(func): return '{module}.{func.__name__}'.format(module=__name__, func=func) -def execute_and_assert(executor, storage=None): +def execute_and_assert(executor, ctx): + node = ctx.model.node.list()[0] expected_value = 'value' - successful_task = MockTask(_get_function(mock_successful_task), storage=storage) - failing_task = MockTask(_get_function(mock_failing_task), storage=storage) - task_with_inputs = MockTask(_get_function(mock_task_with_input), - arguments={'input': models.Argument.wrap('input', 'value')}, - storage=storage) + successful_ctx = put_to_storage_and_get_ctx( + ctx, + models.Task( + function=_get_function(mock_successful_task), node=node, execution=ctx.execution + ) + ) + failing_ctx = put_to_storage_and_get_ctx( + ctx, + models.Task( + function=_get_function(mock_failing_task), node=node, execution=ctx.execution + ) + ) + ctx_with_inputs = put_to_storage_and_get_ctx( + ctx, + models.Task( + node=node, + function=_get_function(mock_task_with_input), + arguments={'input': models.Argument.wrap('input', 'value')}, + execution=ctx.execution + ) + ) - for task in [successful_task, failing_task, task_with_inputs]: - executor.execute(task) + for op_ctx in [successful_ctx, failing_ctx, ctx_with_inputs]: + executor.execute(op_ctx) @retrying.retry(stop_max_delay=10000, wait_fixed=100) def assertion(): - assert successful_task.states == ['start', 'success'] - assert failing_task.states == ['start', 'failure'] - assert task_with_inputs.states == ['start', 'failure'] - assert isinstance(failing_task.exception, MockException) - assert isinstance(task_with_inputs.exception, MockException) - assert task_with_inputs.exception.message == expected_value + assert successful_ctx.states == ['start', 'success'] + assert failing_ctx.states == ['start', 'failure'] + assert ctx_with_inputs.states == ['start', 'failure'] + assert isinstance(failing_ctx.exception, MockException) + assert isinstance(ctx_with_inputs.exception, MockException) + assert ctx_with_inputs.exception.message == expected_value assertion() -def test_thread_execute(thread_executor): - execute_and_assert(thread_executor) +def test_thread_execute(thread_executor, ctx): + execute_and_assert(thread_executor, ctx) -def test_process_execute(process_executor, storage): - execute_and_assert(process_executor, storage) +def test_process_execute(process_executor, ctx): + execute_and_assert(process_executor, ctx) def mock_successful_task(**_): @@ -94,11 +113,11 @@ class MockException(Exception): @pytest.fixture -def storage(tmpdir): - return aria.application_model_storage( - aria.storage.sql_mapi.SQLAlchemyModelAPI, - initiator_kwargs=dict(base_dir=str(tmpdir)) - ) +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir)) + ctx.states = [] + yield context + storage.release_sqlite_storage(context.model) @pytest.fixture(params=[ @@ -124,15 +143,15 @@ def process_executor(): @pytest.fixture(autouse=True) def register_signals(): - def start_handler(task, *args, **kwargs): - task.states.append('start') + def start_handler(ctx, *args, **kwargs): + ctx.states.append('start') - def success_handler(task, *args, **kwargs): - task.states.append('success') + def success_handler(ctx, *args, **kwargs): + ctx.states.append('success') - def failure_handler(task, exception, *args, **kwargs): - task.states.append('failure') - task.exception = exception + def failure_handler(ctx, exception, *args, **kwargs): + ctx.states.append('failure') + ctx.exception = exception events.start_task_signal.connect(start_handler) events.on_success_task_signal.connect(success_handler) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d517b820/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 058190e..ff2ca2e 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -18,25 +18,37 @@ import Queue import pytest -import aria from aria.orchestrator import events from aria.utils.plugin import create as create_plugin from aria.orchestrator.workflows.executor import process +from aria.modeling import models -import tests.storage -import tests.resources +from tests import ( + mock, + storage, + resources +) from tests.fixtures import ( # pylint: disable=unused-import plugins_dir, plugin_manager, fs_model as model ) -from . import MockTask +from tests.orchestrator.workflows.executor import put_to_storage_and_get_ctx class TestProcessExecutor(object): - def test_plugin_execution(self, executor, mock_plugin, storage): - task = MockTask('mock_plugin1.operation', plugin=mock_plugin, storage=storage) + def test_plugin_execution(self, executor, mock_plugin, ctx): + node = next(ctx.nodes) + task_ctx = put_to_storage_and_get_ctx( + ctx, + models.Task( + function='mock_plugin1.operation', + plugin_fk=mock_plugin.id, + node=node, + execution=ctx.execution + ) + ) queue = Queue.Queue() @@ -46,7 +58,7 @@ class TestProcessExecutor(object): events.on_success_task_signal.connect(handler) events.on_failure_task_signal.connect(handler) try: - executor.execute(task) + executor.execute(task_ctx) error = queue.get(timeout=60) # tests/resources/plugins/mock-plugin1 is the plugin installed # during this tests setup. The module mock_plugin1 contains a single @@ -63,10 +75,19 @@ class TestProcessExecutor(object): events.on_success_task_signal.disconnect(handler) events.on_failure_task_signal.disconnect(handler) - def test_closed(self, executor): + def test_closed(self, ctx, executor): executor.close() + node = next(ctx.nodes) + task_ctx = put_to_storage_and_get_ctx( + ctx, + models.Task( + function='mock_plugin1.operation', + node=node, + execution=ctx.execution + ) + ) with pytest.raises(RuntimeError) as exc_info: - executor.execute(task=MockTask(function='some.function')) + executor.execute(task_ctx) assert 'closed' in exc_info.value.message @@ -79,14 +100,14 @@ def executor(plugin_manager): @pytest.fixture def mock_plugin(plugin_manager, tmpdir): - source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1') + source = os.path.join(resources.DIR, 'plugins', 'mock-plugin1') plugin_path = create_plugin(source=source, destination_dir=str(tmpdir)) return plugin_manager.install(source=plugin_path) @pytest.fixture -def storage(tmpdir): - return aria.application_model_storage( - aria.storage.sql_mapi.SQLAlchemyModelAPI, - initiator_kwargs=dict(base_dir=str(tmpdir)) - ) +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir)) + ctx.states = [] + yield context + storage.release_sqlite_storage(context.model) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d517b820/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index 5f0b75f..4ba2670 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -19,7 +19,7 @@ from aria import extension from aria.orchestrator.workflows import api from aria.orchestrator.workflows.core import engine from aria.orchestrator.workflows.executor import process -from aria.orchestrator import workflow, operation +from aria.orchestrator import workflow, operation, workflow_runner import tests from tests import mock @@ -57,8 +57,10 @@ def test_decorate_extension(context, executor): graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) - eng.execute() + workflow_runner.construct_execution_tasks(context.execution, graph, executor.__class__) + context.execution = context.execution + eng = engine.Engine(executor) + eng.execute(context) out = get_node(context).attributes.get('out').value assert out['wrapper_arguments'] == arguments assert out['function_arguments'] == arguments http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d517b820/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index 7dbcc5a..0edc009 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -20,7 +20,7 @@ import pytest from aria.orchestrator.workflows import api from aria.orchestrator.workflows.core import engine from aria.orchestrator.workflows.executor import process -from aria.orchestrator import workflow, operation +from aria.orchestrator import workflow, operation, workflow_runner from aria.orchestrator.workflows import exceptions import tests @@ -107,8 +107,10 @@ def _run_workflow(context, executor, op_func, arguments=None): graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) - eng.execute() + workflow_runner.construct_execution_tasks(context.execution, graph, executor.__class__) + context.execution = context.execution + eng = engine.Engine(executor) + eng.execute(context) out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out') return out.value if out else None