Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks f29148af4 -> d517b820e (forced update)


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d517b820/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py 
b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 5dd2855..de40fcf 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -13,12 +13,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from networkx import topological_sort, DiGraph
-
-from aria.orchestrator import context
-from aria.orchestrator.workflows import api, core
+from networkx import topological_sort
+
+from aria.modeling import models
+from aria.orchestrator import (
+    context,
+    workflow_runner
+)
+from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.executor import base
-
 from tests import mock
 from tests import storage
 
@@ -26,8 +29,8 @@ from tests import storage
 def test_task_graph_into_execution_graph(tmpdir):
     interface_name = 'Standard'
     operation_name = 'create'
-    task_context = mock.context.simple(str(tmpdir))
-    node = 
task_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+    workflow_context = mock.context.simple(str(tmpdir))
+    node = 
workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
     interface = mock.models.create_interface(
         node.service,
         interface_name,
@@ -35,12 +38,12 @@ def test_task_graph_into_execution_graph(tmpdir):
         operation_kwargs=dict(function='test')
     )
     node.interfaces[interface.name] = interface
-    task_context.model.node.update(node)
+    workflow_context.model.node.update(node)
 
     def sub_workflow(name, **_):
         return api.task_graph.TaskGraph(name)
 
-    with context.workflow.current.push(task_context):
+    with context.workflow.current.push(workflow_context):
         test_task_graph = api.task.WorkflowTask(sub_workflow, 
name='test_task_graph')
         simple_before_task = api.task.OperationTask(
             node,
@@ -65,11 +68,12 @@ def test_task_graph_into_execution_graph(tmpdir):
     test_task_graph.add_dependency(simple_after_task, inner_task_graph)
 
     # Direct check
-    execution_graph = DiGraph()
-    core.translation.build_execution_graph(task_graph=test_task_graph,
-                                           execution_graph=execution_graph,
-                                           
default_executor=base.StubTaskExecutor())
-    execution_tasks = topological_sort(execution_graph)
+    execution = workflow_context.model.execution.list()[0]
+
+    workflow_runner.construct_execution_tasks(execution, test_task_graph, 
base.StubTaskExecutor)
+    workflow_context.execution = execution
+
+    execution_tasks = topological_sort(workflow_context._graph)
 
     assert len(execution_tasks) == 7
 
@@ -83,30 +87,23 @@ def test_task_graph_into_execution_graph(tmpdir):
         '{0}-End'.format(test_task_graph.id)
     ]
 
-    assert expected_tasks_names == execution_tasks
-
-    assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph),
-                      core.task.StartWorkflowTask)
-
-    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], 
execution_graph),
-                                  simple_before_task)
-    assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph),
-                      core.task.StartSubWorkflowTask)
+    assert expected_tasks_names == [t.api_id for t in execution_tasks]
+    assert all(isinstance(task, models.Task) for task in execution_tasks)
+    execution_tasks = iter(execution_tasks)
 
-    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], 
execution_graph),
-                                  inner_task)
-    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph),
-                      core.task.EndSubWorkflowTask)
+    assert next(execution_tasks).stub_type == models.Task.START_WORKFLOW
+    _assert_execution_is_api_task(next(execution_tasks), simple_before_task)
+    assert next(execution_tasks).stub_type == models.Task.START_SUBWROFKLOW
+    _assert_execution_is_api_task(next(execution_tasks), inner_task)
+    assert next(execution_tasks).stub_type == models.Task.END_SUBWORKFLOW
+    _assert_execution_is_api_task(next(execution_tasks), simple_after_task)
+    assert next(execution_tasks).stub_type == models.Task.END_WORKFLOW
 
-    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], 
execution_graph),
-                                  simple_after_task)
-    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph),
-                      core.task.EndWorkflowTask)
-    storage.release_sqlite_storage(task_context.model)
+    storage.release_sqlite_storage(workflow_context.model)
 
 
 def _assert_execution_is_api_task(execution_task, api_task):
-    assert execution_task.id == api_task.id
+    assert execution_task.api_id == api_task.id
     assert execution_task.name == api_task.name
     assert execution_task.function == api_task.function
     assert execution_task.actor == api_task.actor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d517b820/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py 
b/tests/orchestrator/workflows/executor/__init__.py
index ac6d325..4bc5c54 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -12,69 +12,62 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import uuid
 import logging
-from collections import namedtuple
-from contextlib import contextmanager
 
 import aria
-from aria.modeling import models
-
-
-class MockTask(object):
-
-    INFINITE_RETRIES = models.Task.INFINITE_RETRIES
-
-    def __init__(self, function, arguments=None, plugin=None, storage=None):
-        self.function = self.name = function
-        self.plugin_fk = plugin.id if plugin else None
-        self.plugin = plugin or None
-        self.arguments = arguments or {}
-        self.states = []
-        self.exception = None
-        self.id = str(uuid.uuid4())
-        self.logger = logging.getLogger()
-        self.context = MockContext(storage)
-        self.attempts_count = 1
-        self.max_attempts = 1
-        self.ignore_failure = False
-        self.interface_name = 'interface_name'
-        self.operation_name = 'operation_name'
-        self.actor = namedtuple('actor', 'name')(name='actor_name')
-        self.model_task = None
-
-        for state in models.Task.STATES:
-            setattr(self, state.upper(), state)
-
-    @contextmanager
-    def _update(self):
-        yield self
+from aria.orchestrator.context.operation import NodeOperationContext
 
 
 class MockContext(object):
 
-    def __init__(self, storage=None):
+    def __init__(self, storage, **kwargs):
         self.logger = logging.getLogger('mock_logger')
-        self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
         self.model = storage
+        task = storage.task.model_cls(**kwargs)
+        self.model.task.put(task)
+        self._task_id = task.id
+        self.states = []
+        self.exception = None
+
+    @property
+    def task(self):
+        return self.model.task.get(self._task_id)
 
     @property
     def serialization_dict(self):
         if self.model:
-            return {'context': self.model.serialization_dict, 'context_cls': 
self.__class__}
+            context = self.model.serialization_dict
+            context['task_id'] = self.task_id
+            return {'context': context, 'context_cls': self.__class__}
         else:
-            return {'context_cls': self.__class__, 'context': {}}
+            return {'context_cls': self.__class__, 'context': {'task': 
self.task_id}}
 
     def __getattr__(self, item):
         return None
 
     @classmethod
-    def instantiate_from_dict(cls, **kwargs):
+    def instantiate_from_dict(cls, task_id, **kwargs):
         if kwargs:
-            return cls(storage=aria.application_model_storage(**kwargs))
+            return cls(task_id=task_id, 
storage=aria.application_model_storage(**kwargs))
         else:
-            return cls()
+            return cls(task=task_id, storage=None)
 
     @staticmethod
     def close():
         pass
+
+
+def put_to_storage_and_get_ctx(ctx, task):
+    ctx.model.task.put(task)
+    op_ctx = NodeOperationContext(
+        model_storage=ctx.model,
+        resource_storage=ctx.resource,
+        workdir=ctx._workdir,
+        task_id=task.id,
+        actor_id=task.actor.id if task.actor else None,
+        service_id=task.execution.service.id,
+        execution_id=task.execution.id,
+        name=task.name
+    )
+    op_ctx.states = []
+    return op_ctx

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d517b820/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py 
b/tests/orchestrator/workflows/executor/test_executor.py
index 3079c60..9d97a26 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -17,6 +17,8 @@
 import pytest
 import retrying
 
+
+
 try:
     import celery as _celery
     app = _celery.Celery()
@@ -25,7 +27,6 @@ except ImportError:
     _celery = None
     app = None
 
-import aria
 from aria.modeling import models
 from aria.orchestrator import events
 from aria.orchestrator.workflows.executor import (
@@ -35,41 +36,59 @@ from aria.orchestrator.workflows.executor import (
 )
 
 import tests
-from . import MockTask
+from tests import mock, storage
+from . import put_to_storage_and_get_ctx
 
 
 def _get_function(func):
     return '{module}.{func.__name__}'.format(module=__name__, func=func)
 
 
-def execute_and_assert(executor, storage=None):
+def execute_and_assert(executor, ctx):
+    node = ctx.model.node.list()[0]
     expected_value = 'value'
-    successful_task = MockTask(_get_function(mock_successful_task), 
storage=storage)
-    failing_task = MockTask(_get_function(mock_failing_task), storage=storage)
-    task_with_inputs = MockTask(_get_function(mock_task_with_input),
-                                arguments={'input': 
models.Argument.wrap('input', 'value')},
-                                storage=storage)
+    successful_ctx = put_to_storage_and_get_ctx(
+        ctx,
+        models.Task(
+            function=_get_function(mock_successful_task), node=node, 
execution=ctx.execution
+        )
+    )
+    failing_ctx = put_to_storage_and_get_ctx(
+        ctx,
+        models.Task(
+            function=_get_function(mock_failing_task), node=node, 
execution=ctx.execution
+        )
+    )
+    ctx_with_inputs = put_to_storage_and_get_ctx(
+        ctx,
+        models.Task(
+            node=node,
+            function=_get_function(mock_task_with_input),
+            arguments={'input': models.Argument.wrap('input', 'value')},
+            execution=ctx.execution
+        )
+    )
 
-    for task in [successful_task, failing_task, task_with_inputs]:
-        executor.execute(task)
+    for op_ctx in [successful_ctx, failing_ctx, ctx_with_inputs]:
+        executor.execute(op_ctx)
 
     @retrying.retry(stop_max_delay=10000, wait_fixed=100)
     def assertion():
-        assert successful_task.states == ['start', 'success']
-        assert failing_task.states == ['start', 'failure']
-        assert task_with_inputs.states == ['start', 'failure']
-        assert isinstance(failing_task.exception, MockException)
-        assert isinstance(task_with_inputs.exception, MockException)
-        assert task_with_inputs.exception.message == expected_value
+        assert successful_ctx.states == ['start', 'success']
+        assert failing_ctx.states == ['start', 'failure']
+        assert ctx_with_inputs.states == ['start', 'failure']
+        assert isinstance(failing_ctx.exception, MockException)
+        assert isinstance(ctx_with_inputs.exception, MockException)
+        assert ctx_with_inputs.exception.message == expected_value
     assertion()
 
 
-def test_thread_execute(thread_executor):
-    execute_and_assert(thread_executor)
+def test_thread_execute(thread_executor, ctx):
+    execute_and_assert(thread_executor, ctx)
 
 
-def test_process_execute(process_executor, storage):
-    execute_and_assert(process_executor, storage)
+def test_process_execute(process_executor, ctx):
+    execute_and_assert(process_executor, ctx)
 
 
 def mock_successful_task(**_):
@@ -94,11 +113,11 @@ class MockException(Exception):
 
 
 @pytest.fixture
-def storage(tmpdir):
-    return aria.application_model_storage(
-        aria.storage.sql_mapi.SQLAlchemyModelAPI,
-        initiator_kwargs=dict(base_dir=str(tmpdir))
-    )
+def ctx(tmpdir):
+    context = mock.context.simple(str(tmpdir))
+    ctx.states = []
+    yield context
+    storage.release_sqlite_storage(context.model)
 
 
 @pytest.fixture(params=[
@@ -124,15 +143,15 @@ def process_executor():
 
 @pytest.fixture(autouse=True)
 def register_signals():
-    def start_handler(task, *args, **kwargs):
-        task.states.append('start')
+    def start_handler(ctx, *args, **kwargs):
+        ctx.states.append('start')
 
-    def success_handler(task, *args, **kwargs):
-        task.states.append('success')
+    def success_handler(ctx, *args, **kwargs):
+        ctx.states.append('success')
 
-    def failure_handler(task, exception, *args, **kwargs):
-        task.states.append('failure')
-        task.exception = exception
+    def failure_handler(ctx, exception, *args, **kwargs):
+        ctx.states.append('failure')
+        ctx.exception = exception
 
     events.start_task_signal.connect(start_handler)
     events.on_success_task_signal.connect(success_handler)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d517b820/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py 
b/tests/orchestrator/workflows/executor/test_process_executor.py
index 058190e..ff2ca2e 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -18,25 +18,37 @@ import Queue
 
 import pytest
 
-import aria
 from aria.orchestrator import events
 from aria.utils.plugin import create as create_plugin
 from aria.orchestrator.workflows.executor import process
+from aria.modeling import models
 
-import tests.storage
-import tests.resources
+from tests import (
+    mock,
+    storage,
+    resources
+)
 from tests.fixtures import (  # pylint: disable=unused-import
     plugins_dir,
     plugin_manager,
     fs_model as model
 )
-from . import MockTask
+from tests.orchestrator.workflows.executor import put_to_storage_and_get_ctx
 
 
 class TestProcessExecutor(object):
 
-    def test_plugin_execution(self, executor, mock_plugin, storage):
-        task = MockTask('mock_plugin1.operation', plugin=mock_plugin, 
storage=storage)
+    def test_plugin_execution(self, executor, mock_plugin, ctx):
+        node = next(ctx.nodes)
+        task_ctx = put_to_storage_and_get_ctx(
+            ctx,
+            models.Task(
+                function='mock_plugin1.operation',
+                plugin_fk=mock_plugin.id,
+                node=node,
+                execution=ctx.execution
+            )
+        )
 
         queue = Queue.Queue()
 
@@ -46,7 +58,7 @@ class TestProcessExecutor(object):
         events.on_success_task_signal.connect(handler)
         events.on_failure_task_signal.connect(handler)
         try:
-            executor.execute(task)
+            executor.execute(task_ctx)
             error = queue.get(timeout=60)
             # tests/resources/plugins/mock-plugin1 is the plugin installed
             # during this tests setup. The module mock_plugin1 contains a 
single
@@ -63,10 +75,19 @@ class TestProcessExecutor(object):
             events.on_success_task_signal.disconnect(handler)
             events.on_failure_task_signal.disconnect(handler)
 
-    def test_closed(self, executor):
+    def test_closed(self, ctx, executor):
         executor.close()
+        node = next(ctx.nodes)
+        task_ctx = put_to_storage_and_get_ctx(
+            ctx,
+            models.Task(
+                function='mock_plugin1.operation',
+                node=node,
+                execution=ctx.execution
+            )
+        )
         with pytest.raises(RuntimeError) as exc_info:
-            executor.execute(task=MockTask(function='some.function'))
+            executor.execute(task_ctx)
         assert 'closed' in exc_info.value.message
 
 
@@ -79,14 +100,14 @@ def executor(plugin_manager):
 
 @pytest.fixture
 def mock_plugin(plugin_manager, tmpdir):
-    source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
+    source = os.path.join(resources.DIR, 'plugins', 'mock-plugin1')
     plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
     return plugin_manager.install(source=plugin_path)
 
 
 @pytest.fixture
-def storage(tmpdir):
-    return aria.application_model_storage(
-        aria.storage.sql_mapi.SQLAlchemyModelAPI,
-        initiator_kwargs=dict(base_dir=str(tmpdir))
-    )
+def ctx(tmpdir):
+    context = mock.context.simple(str(tmpdir))
+    ctx.states = []
+    yield context
+    storage.release_sqlite_storage(context.model)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d517b820/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/executor/test_process_executor_extension.py 
b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 5f0b75f..4ba2670 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -19,7 +19,7 @@ from aria import extension
 from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.core import engine
 from aria.orchestrator.workflows.executor import process
-from aria.orchestrator import workflow, operation
+from aria.orchestrator import workflow, operation, workflow_runner
 
 import tests
 from tests import mock
@@ -57,8 +57,10 @@ def test_decorate_extension(context, executor):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: 
disable=no-value-for-parameter
-    eng = engine.Engine(executor=executor, workflow_context=context, 
tasks_graph=graph)
-    eng.execute()
+    workflow_runner.construct_execution_tasks(context.execution, graph, 
executor.__class__)
+    context.execution = context.execution
+    eng = engine.Engine(executor)
+    eng.execute(context)
     out = get_node(context).attributes.get('out').value
     assert out['wrapper_arguments'] == arguments
     assert out['function_arguments'] == arguments

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d517b820/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
 
b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 7dbcc5a..0edc009 100644
--- 
a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ 
b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -20,7 +20,7 @@ import pytest
 from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.core import engine
 from aria.orchestrator.workflows.executor import process
-from aria.orchestrator import workflow, operation
+from aria.orchestrator import workflow, operation, workflow_runner
 from aria.orchestrator.workflows import exceptions
 
 import tests
@@ -107,8 +107,10 @@ def _run_workflow(context, executor, op_func, 
arguments=None):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: 
disable=no-value-for-parameter
-    eng = engine.Engine(executor=executor, workflow_context=context, 
tasks_graph=graph)
-    eng.execute()
+    workflow_runner.construct_execution_tasks(context.execution, graph, 
executor.__class__)
+    context.execution = context.execution
+    eng = engine.Engine(executor)
+    eng.execute(context)
     out = 
context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
     return out.value if out else None
 

Reply via email to