Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-278-Remove-core-tasks 8044a035f -> 979a4b445
wip on executor tests Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/979a4b44 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/979a4b44 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/979a4b44 Branch: refs/heads/ARIA-278-Remove-core-tasks Commit: 979a4b445bf0261db2bb2632cd93e88ea6d61222 Parents: 8044a03 Author: max-orlov <ma...@gigaspaces.com> Authored: Tue Jun 13 16:49:15 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Tue Jun 13 16:49:15 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/base.py | 3 - aria/orchestrator/workflows/executor/thread.py | 2 + .../orchestrator/workflows/executor/__init__.py | 42 +++--------- .../workflows/executor/test_executor.py | 70 +++++++++++--------- 4 files changed, 49 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/979a4b44/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 089126d..54a9438 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -42,7 +42,6 @@ class BaseExecutor(logger.LoggerMixin): # itself. self._task_started(ctx) self._task_succeeded(ctx) - ctx.model.task.update(ctx.task) def close(self): """ @@ -58,12 +57,10 @@ class BaseExecutor(logger.LoggerMixin): def _task_failed(self, ctx, exception, traceback=None): events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback) ctx.model.task.update(ctx.task) - self.close() def _task_succeeded(self, ctx): events.on_success_task_signal.send(ctx) ctx.model.task.update(ctx.task) - self.close() class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/979a4b44/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 8c447b6..074b54b 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -68,6 +68,8 @@ class ThreadExecutor(BaseExecutor): self._task_failed(ctx, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info())) + finally: + break # Daemon threads except BaseException as e: pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/979a4b44/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index 07fb8ad..b8032b7 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -12,47 +12,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import uuid import logging -from collections import namedtuple -from contextlib import contextmanager import aria -from aria.modeling import models - - -class MockTask(object): - - INFINITE_RETRIES = models.Task.INFINITE_RETRIES - - def __init__(self, function, arguments=None, plugin=None, storage=None): - self.function = self.name = function - self.plugin_fk = plugin.id if plugin else None - self.plugin = plugin or None - self.arguments = arguments or {} - self.states = [] - self.exception = None - self.id = str(uuid.uuid4()) - self.logger = logging.getLogger() - self.context = MockContext(self.id, storage) - self.attempts_count = 1 - self.max_attempts = 1 - self.ignore_failure = False - self.interface_name = 'interface_name' - self.operation_name = 'operation_name' - self.actor = namedtuple('actor', 'name')(name='actor_name') - self.model_task = None - - for state in models.Task.STATES: - setattr(self, state.upper(), state) class MockContext(object): - def __init__(self, task_id, storage=None): + def __init__(self, storage, **kwargs): self.logger = logging.getLogger('mock_logger') - self.task_id = task_id self.model = storage + task = storage.task.model_cls(**kwargs) + self.model.task.put(task) + self._task_id = task.id + self.states = [] + self.exception = None + + @property + def task(self): + return self.model.task.get(self._task_id) @property def serialization_dict(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/979a4b44/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index cfb6975..3fa75ad 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -17,6 +17,8 @@ import pytest import retrying +from tests import mock, storage + try: import celery as _celery app = _celery.Celery() @@ -25,7 +27,6 @@ except ImportError: _celery = None app = None -import aria from aria.modeling import models from aria.orchestrator import events from aria.orchestrator.workflows.executor import ( @@ -35,41 +36,45 @@ from aria.orchestrator.workflows.executor import ( ) import tests -from . import MockTask +from . import MockContext def _get_function(func): return '{module}.{func.__name__}'.format(module=__name__, func=func) -def execute_and_assert(executor, storage=None): +def execute_and_assert(executor, ctx): + node = ctx.model.node.list()[0] expected_value = 'value' - successful_task = MockTask(_get_function(mock_successful_task), storage=storage) - failing_task = MockTask(_get_function(mock_failing_task), storage=storage) - task_with_inputs = MockTask(_get_function(mock_task_with_input), - arguments={'input': models.Argument.wrap('input', 'value')}, - storage=storage) - - for task in [successful_task, failing_task, task_with_inputs]: - executor.execute(task) + successful_ctx = ctx.model.task.model_cls(function=_get_function(mock_successful_task), + node=node, + _executor=executor) + failing_ctx = ctx.model.task.model_cls(function=_get_function(mock_failing_task), node=node) + ctx_with_inputs = ctx.model.task.model_cls( + node=node, + function=_get_function(mock_task_with_input), + arguments={'input': models.Argument.wrap('input', 'value')}) + + for task in [successful_ctx, failing_ctx, ctx_with_inputs]: + task.execute(ctx) @retrying.retry(stop_max_delay=10000, wait_fixed=100) def assertion(): - assert successful_task.states == ['start', 'success'] - assert failing_task.states == ['start', 'failure'] - assert task_with_inputs.states == ['start', 'failure'] - assert isinstance(failing_task.exception, MockException) - assert isinstance(task_with_inputs.exception, MockException) - assert task_with_inputs.exception.message == expected_value + assert successful_ctx.states == ['start', 'success'] + assert failing_ctx.states == ['start', 'failure'] + assert ctx_with_inputs.states == ['start', 'failure'] + assert isinstance(failing_ctx.exception, MockException) + assert isinstance(ctx_with_inputs.exception, MockException) + assert ctx_with_inputs.exception.message == expected_value assertion() -def test_thread_execute(thread_executor): - execute_and_assert(thread_executor) +def test_thread_execute(thread_executor, ctx): + execute_and_assert(thread_executor, ctx) -def test_process_execute(process_executor, storage): - execute_and_assert(process_executor, storage) +def test_process_execute(process_executor, ctx): + execute_and_assert(process_executor, ctx) def mock_successful_task(**_): @@ -94,11 +99,10 @@ class MockException(Exception): @pytest.fixture -def storage(tmpdir): - return aria.application_model_storage( - aria.storage.sql_mapi.SQLAlchemyModelAPI, - initiator_kwargs=dict(base_dir=str(tmpdir)) - ) +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir)) + yield context + storage.release_sqlite_storage(context.model) @pytest.fixture(params=[ @@ -124,15 +128,15 @@ def process_executor(): @pytest.fixture(autouse=True) def register_signals(): - def start_handler(task, *args, **kwargs): - task.states.append('start') + def start_handler(ctx, *args, **kwargs): + ctx.states.append('start') - def success_handler(task, *args, **kwargs): - task.states.append('success') + def success_handler(ctx, *args, **kwargs): + ctx.states.append('success') - def failure_handler(task, exception, *args, **kwargs): - task.states.append('failure') - task.exception = exception + def failure_handler(ctx, exception, *args, **kwargs): + ctx.states.append('failure') + ctx.exception = exception events.start_task_signal.connect(start_handler) events.on_success_task_signal.connect(success_handler)