ARIA-299 Resuming canceled execution with frozen task fails
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b30a7edd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b30a7edd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b30a7edd Branch: refs/heads/ARIA-103-remove-clint-dependency Commit: b30a7edd8a56e21d54e93058b97b3d6162f82fc2 Parents: c46c94b Author: max-orlov <ma...@gigaspaces.com> Authored: Wed Jul 5 16:16:39 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Mon Jul 10 15:20:33 2017 +0300 ---------------------------------------------------------------------- .../workflows/core/events_handler.py | 4 + aria/orchestrator/workflows/executor/base.py | 2 +- aria/orchestrator/workflows/executor/thread.py | 8 +- tests/orchestrator/context/test_serialize.py | 6 +- .../orchestrator/execution_plugin/test_local.py | 6 +- tests/orchestrator/execution_plugin/test_ssh.py | 6 +- tests/orchestrator/test_workflow_runner.py | 206 ++++++++++++++++--- .../workflows/executor/test_process_executor.py | 66 +++--- ...process_executor_concurrent_modifications.py | 6 +- .../executor/test_process_executor_extension.py | 6 +- .../test_process_executor_tracked_changes.py | 6 +- 11 files changed, 251 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 769c1a8..37801de 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -123,6 +123,10 @@ def _workflow_resume(workflow_context, *args, **kwargs): with workflow_context.persist_changes: execution = workflow_context.execution execution.status = execution.PENDING + # Any non ended task would be put back to pending state + for task in execution.tasks: + if not task.has_ended(): + task.status = task.PENDING @events.on_cancelling_workflow_signal.connect http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index ec1a0c7..e7d03ea 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -49,7 +49,7 @@ class BaseExecutor(logger.LoggerMixin): """ pass - def terminate(self, ctx): + def terminate(self, task_id): """ Terminate the executing task :return: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 26484dc..170620e 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -36,9 +36,10 @@ class ThreadExecutor(BaseExecutor): Note: This executor is incapable of running plugin operations. """ - def __init__(self, pool_size=1, *args, **kwargs): + def __init__(self, pool_size=1, close_timeout=5, *args, **kwargs): super(ThreadExecutor, self).__init__(*args, **kwargs) self._stopped = False + self._close_timeout = close_timeout self._queue = Queue.Queue() self._pool = [] for i in range(pool_size): @@ -54,7 +55,10 @@ class ThreadExecutor(BaseExecutor): def close(self): self._stopped = True for thread in self._pool: - thread.join() + if self._close_timeout is None: + thread.join() + else: + thread.join(self._close_timeout) def _processor(self): while not self._stopped: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index 6046a16..091e23c 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -87,8 +87,10 @@ def _operation_mapping(): @pytest.fixture def executor(): result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) - yield result - result.close() + try: + yield result + finally: + result.close() @pytest.fixture http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 5b94917..e64e998 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -509,8 +509,10 @@ if __name__ == '__main__': @pytest.fixture def executor(self): result = process.ProcessExecutor() - yield result - result.close() + try: + yield result + finally: + result.close() @pytest.fixture def workflow_context(self, tmpdir): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index 4fa8184..a96c91d 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -277,8 +277,10 @@ class TestWithActualSSHServer(object): @pytest.fixture def executor(self): result = process.ProcessExecutor() - yield result - result.close() + try: + yield result + finally: + result.close() @pytest.fixture def workflow_context(self, tmpdir): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index e640c7d..a77d727 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -14,6 +14,7 @@ # limitations under the License. import json +import time from threading import Thread, Event from datetime import datetime @@ -23,7 +24,7 @@ import pytest from aria.modeling import exceptions as modeling_exceptions from aria.modeling import models from aria.orchestrator import exceptions -from aria.orchestrator.events import on_cancelled_workflow_signal +from aria.orchestrator import events from aria.orchestrator.workflow_runner import WorkflowRunner from aria.orchestrator.workflows.executor.process import ProcessExecutor from aria.orchestrator.workflows import api @@ -46,9 +47,10 @@ from ..fixtures import ( # pylint: disable=unused-import resource_storage as resource ) -events = { +custom_events = { 'is_resumed': Event(), 'is_active': Event(), + 'execution_cancelled': Event(), 'execution_ended': Event() } @@ -57,6 +59,10 @@ class TimeoutError(BaseException): pass +class FailingTask(BaseException): + pass + + def test_undeclared_workflow(request): # validating a proper error is raised when the workflow is not declared in the service with pytest.raises(exceptions.UndeclaredWorkflowError): @@ -318,43 +324,57 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None, class TestResumableWorkflows(object): - def test_resume_workflow(self, workflow_context, executor): - node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) - node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) - self._create_interface(workflow_context, node, mock_resuming_task) + def _create_initial_workflow_runner( + self, workflow_context, workflow, executor, inputs=None): service = workflow_context.service service.workflows['custom_workflow'] = tests_mock.models.create_operation( 'custom_workflow', - operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)} + operation_kwargs={ + 'function': '{0}.{1}'.format(__name__, workflow.__name__), + 'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items()) + } ) workflow_context.model.service.update(service) wf_runner = WorkflowRunner( service_id=workflow_context.service.id, - inputs={}, + inputs=inputs or {}, model_storage=workflow_context.model, resource_storage=workflow_context.resource, plugin_manager=None, workflow_name='custom_workflow', executor=executor) + return wf_runner + + @staticmethod + def _wait_for_active_and_cancel(workflow_runner): + if custom_events['is_active'].wait(60) is False: + raise TimeoutError("is_active wasn't set to True") + workflow_runner.cancel() + if custom_events['execution_cancelled'].wait(60) is False: + raise TimeoutError("Execution did not end") + + def test_resume_workflow(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_resuming_task) + + wf_runner = self._create_initial_workflow_runner( + workflow_context, mock_parallel_workflow, thread_executor) + wf_thread = Thread(target=wf_runner.execute) wf_thread.daemon = True wf_thread.start() # Wait for the execution to start - if events['is_active'].wait(5) is False: - raise TimeoutError("is_active wasn't set to True") - wf_runner.cancel() - - if events['execution_ended'].wait(60) is False: - raise TimeoutError("Execution did not end") + self._wait_for_active_and_cancel(wf_runner) tasks = workflow_context.model.task.list(filters={'_stub_type': None}) assert any(task.status == task.SUCCESS for task in tasks) - assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks) - events['is_resumed'].set() - assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks) + assert any(task.status == task.RETRYING for task in tasks) + custom_events['is_resumed'].set() + assert any(task.status == task.RETRYING for task in tasks) # Create a new workflow runner, with an existing execution id. This would cause # the old execution to restart. @@ -365,7 +385,7 @@ class TestResumableWorkflows(object): resource_storage=workflow_context.resource, plugin_manager=None, execution_id=wf_runner.execution.id, - executor=executor) + executor=thread_executor) new_wf_runner.execute() @@ -374,9 +394,93 @@ class TestResumableWorkflows(object): assert node.attributes['invocations'].value == 3 assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + def test_resume_started_task(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_stuck_task) + + wf_runner = self._create_initial_workflow_runner( + workflow_context, mock_single_task_workflow, thread_executor) + + wf_thread = Thread(target=wf_runner.execute) + wf_thread.daemon = True + wf_thread.start() + + self._wait_for_active_and_cancel(wf_runner) + task = workflow_context.model.task.list(filters={'_stub_type': None})[0] + assert node.attributes['invocations'].value == 1 + assert task.status == task.STARTED + assert wf_runner.execution.status in (wf_runner.execution.CANCELLED, + wf_runner.execution.CANCELLING) + custom_events['is_resumed'].set() + + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=new_thread_executor) + + new_wf_runner.execute() + finally: + new_thread_executor.close() + + # Wait for it to finish and assert changes. + assert node.attributes['invocations'].value == 2 + assert task.status == task.SUCCESS + assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + + def test_resume_failed_task(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_failed_before_resuming) + + wf_runner = self._create_initial_workflow_runner( + workflow_context, mock_single_task_workflow, thread_executor) + wf_thread = Thread(target=wf_runner.execute) + wf_thread.setDaemon(True) + wf_thread.start() + + self._wait_for_active_and_cancel(wf_runner) + + task = workflow_context.model.task.list(filters={'_stub_type': None})[0] + assert node.attributes['invocations'].value == 2 + assert task.status == task.STARTED + assert wf_runner.execution.status in (wf_runner.execution.CANCELLED, + wf_runner.execution.CANCELLING) + + custom_events['is_resumed'].set() + assert node.attributes['invocations'].value == 2 + + # Create a new workflow runner, with an existing execution id. This would cause + # the old execution to restart. + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=new_thread_executor) + + new_wf_runner.execute() + finally: + new_thread_executor.close() + + # Wait for it to finish and assert changes. + assert node.attributes['invocations'].value == task.max_attempts - 1 + assert task.status == task.SUCCESS + assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + @staticmethod @pytest.fixture - def executor(): + def thread_executor(): result = thread.ThreadExecutor() try: yield result @@ -417,16 +521,23 @@ class TestResumableWorkflows(object): @pytest.fixture(autouse=True) def register_to_events(self): + def execution_cancelled(*args, **kwargs): + custom_events['execution_cancelled'].set() + def execution_ended(*args, **kwargs): - events['execution_ended'].set() + custom_events['execution_ended'].set() - on_cancelled_workflow_signal.connect(execution_ended) + events.on_cancelled_workflow_signal.connect(execution_cancelled) + events.on_failure_workflow_signal.connect(execution_ended) yield - on_cancelled_workflow_signal.disconnect(execution_ended) + events.on_cancelled_workflow_signal.disconnect(execution_cancelled) + events.on_failure_workflow_signal.disconnect(execution_ended) + for event in custom_events.values(): + event.clear() @workflow -def mock_workflow(ctx, graph): +def mock_parallel_workflow(ctx, graph): node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) graph.add_tasks( api.task.OperationTask( @@ -441,8 +552,51 @@ def mock_resuming_task(ctx): ctx.node.attributes['invocations'] += 1 if ctx.node.attributes['invocations'] != 1: - events['is_active'].set() - if not events['is_resumed'].isSet(): + custom_events['is_active'].set() + if not custom_events['is_resumed'].isSet(): # if resume was called, increase by one. o/w fail the execution - second task should # fail as long it was not a part of resuming the workflow - raise BaseException("wasn't resumed yet") + raise FailingTask("wasn't resumed yet") + + +@workflow +def mock_single_task_workflow(ctx, graph): + node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + graph.add_tasks( + api.task.OperationTask(node, + interface_name='aria.interfaces.lifecycle', + operation_name='create', + retry_interval=1, + max_attempts=10), + ) + + +@operation +def mock_failed_before_resuming(ctx): + """ + The task should run atmost ctx.task.max_attempts - 1 times, and only then pass. + overall, the number of invocations should be ctx.task.max_attempts - 1 + """ + ctx.node.attributes['invocations'] += 1 + + if ctx.node.attributes['invocations'] == 2: + custom_events['is_active'].set() + # unfreeze the thread only when all of the invocations are done + while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1: + time.sleep(5) + + elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1: + # pass only just before the end. + return + else: + # fail o.w. + raise FailingTask("stop this task") + + +@operation +def mock_stuck_task(ctx): + ctx.node.attributes['invocations'] += 1 + while not custom_events['is_resumed'].isSet(): + if not custom_events['is_active'].isSet(): + custom_events['is_active'].set() + time.sleep(5) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index f8fc567..e050d18 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -43,36 +43,25 @@ from . import MockContext class TestProcessExecutor(object): - def test_plugin_execution(self, executor, mock_plugin, model): + def test_plugin_execution(self, executor, mock_plugin, model, queue): ctx = MockContext( model, task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id) ) - queue = Queue.Queue() - - def handler(_, exception=None, **kwargs): - queue.put(exception) - - events.on_success_task_signal.connect(handler) - events.on_failure_task_signal.connect(handler) - try: - executor.execute(ctx) - error = queue.get(timeout=60) - # tests/resources/plugins/mock-plugin1 is the plugin installed - # during this tests setup. The module mock_plugin1 contains a single - # operation named "operation" which calls an entry point defined in the plugin's - # setup.py. This entry points simply prints 'mock-plugin-output' to stdout. - # The "operation" operation that called this subprocess, then raises a RuntimeError - # with that subprocess output as the error message. - # This is what we assert here. This tests checks that both the PYTHONPATH (operation) - # and PATH (entry point) are properly updated in the subprocess in which the task is - # running. - assert isinstance(error, RuntimeError) - assert error.message == 'mock-plugin-output' - finally: - events.on_success_task_signal.disconnect(handler) - events.on_failure_task_signal.disconnect(handler) + executor.execute(ctx) + error = queue.get(timeout=60) + # tests/resources/plugins/mock-plugin1 is the plugin installed + # during this tests setup. The module mock_plugin1 contains a single + # operation named "operation" which calls an entry point defined in the plugin's + # setup.py. This entry points simply prints 'mock-plugin-output' to stdout. + # The "operation" operation that called this subprocess, then raises a RuntimeError + # with that subprocess output as the error message. + # This is what we assert here. This tests checks that both the PYTHONPATH (operation) + # and PATH (entry point) are properly updated in the subprocess in which the task is + # running. + assert isinstance(error, RuntimeError) + assert error.message == 'mock-plugin-output' def test_closed(self, executor, model): executor.close() @@ -127,6 +116,23 @@ while True: # making the test more readable assert pid not in psutil.pids() + +@pytest.fixture +def queue(): + _queue = Queue.Queue() + + def handler(_, exception=None, **kwargs): + _queue.put(exception) + + events.on_success_task_signal.connect(handler) + events.on_failure_task_signal.connect(handler) + try: + yield _queue + finally: + events.on_success_task_signal.disconnect(handler) + events.on_failure_task_signal.disconnect(handler) + + @pytest.fixture def fs_test_holder(tmpdir): dataholder_path = str(tmpdir.join('dataholder')) @@ -136,11 +142,11 @@ def fs_test_holder(tmpdir): @pytest.fixture def executor(plugin_manager): - result = process.ProcessExecutor( - plugin_manager=plugin_manager, - python_path=[tests.ROOT_DIR]) - yield result - result.close() + result = process.ProcessExecutor(plugin_manager=plugin_manager, python_path=[tests.ROOT_DIR]) + try: + yield result + finally: + result.close() @pytest.fixture http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py index 6163c09..86a2edf 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py @@ -120,8 +120,10 @@ def _test(context, executor, lock_files, func, dataholder, expected_failure): @pytest.fixture def executor(): result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) - yield result - result.close() + try: + yield result + finally: + result.close() @pytest.fixture http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index 6ed3e2b..b26fa43 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -86,8 +86,10 @@ def _mock_operation(ctx, **operation_arguments): @pytest.fixture def executor(): result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) - yield result - result.close() + try: + yield result + finally: + result.close() @pytest.fixture http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index a74a473..47ee2f7 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -155,8 +155,10 @@ def _operation_mapping(func): @pytest.fixture def executor(): result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) - yield result - result.close() + try: + yield result + finally: + result.close() @pytest.fixture