Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-236-Resumable-workflow-executions 608f5d791 -> 6236a4768 (forced update)
ARIA-236 Resumable workflow executions Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/6236a476 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/6236a476 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/6236a476 Branch: refs/heads/ARIA-236-Resumable-workflow-executions Commit: 6236a4768028db7bf2ccaa56a4f6432127ed5d29 Parents: 1fee85c Author: max-orlov <ma...@gigaspaces.com> Authored: Mon Jun 19 17:44:45 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Thu Jun 22 12:31:24 2017 +0300 ---------------------------------------------------------------------- aria/cli/commands/executions.py | 57 +++++- aria/cli/logger.py | 4 +- aria/modeling/orchestration.py | 3 +- aria/orchestrator/context/workflow.py | 5 + aria/orchestrator/events.py | 1 + aria/orchestrator/exceptions.py | 7 + aria/orchestrator/workflow_runner.py | 43 +++-- aria/orchestrator/workflows/core/engine.py | 6 +- .../workflows/core/events_handler.py | 7 + tests/mock/__init__.py | 2 +- tests/mock/models.py | 14 +- tests/modeling/test_models.py | 5 +- tests/orchestrator/test_workflow_runner.py | 175 +++++++++++++++++-- 13 files changed, 282 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index 6176ea2..b337e84 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -134,18 +134,63 @@ def start(workflow_name, executor = DryExecutor() if dry else None # use WorkflowRunner's default executor workflow_runner = \ - WorkflowRunner(workflow_name, service.id, inputs, - model_storage, resource_storage, plugin_manager, - executor, task_max_attempts, task_retry_interval) + WorkflowRunner( + model_storage, resource_storage, plugin_manager, + service_id=service.id, workflow_name=workflow_name, inputs=inputs, executor=executor, + task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval + ) + logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) + + _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern) + - execution_thread_name = '{0}_{1}'.format(service_name, workflow_name) +@executions.command(name='resume', + short_help='Resume a workflow') +@aria.argument('execution-id') +@aria.options.inputs(help=helptexts.EXECUTION_INPUTS) +@aria.options.dry_execution +@aria.options.task_max_attempts() +@aria.options.task_retry_interval() +@aria.options.mark_pattern() +@aria.options.verbose() +@aria.pass_model_storage +@aria.pass_resource_storage +@aria.pass_plugin_manager +@aria.pass_logger +def resume(execution_id, + dry, + task_max_attempts, + task_retry_interval, + mark_pattern, + model_storage, + resource_storage, + plugin_manager, + logger): + executor = DryExecutor() if dry else None # use WorkflowRunner's default executor + + workflow_runner = \ + WorkflowRunner( + model_storage, resource_storage, plugin_manager, + execution_id=execution_id, executor=executor, + task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval + ) + + logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) + _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern) + + +def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern): + execution_thread_name = '{0}_{1}'.format(workflow_runner.service.name, + workflow_runner.execution.workflow_name) execution_thread = threading.ExceptionThread(target=workflow_runner.execute, name=execution_thread_name) - logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) execution_thread.start() - log_iterator = cli_logger.ModelLogIterator(model_storage, workflow_runner.execution_id) + last_task_id = workflow_runner.execution.logs[-1].id if workflow_runner.execution.logs else 0 + log_iterator = cli_logger.ModelLogIterator(model_storage, + workflow_runner.execution_id, + offset=last_task_id) try: while execution_thread.is_alive(): execution_logging.log_list(log_iterator, mark_pattern=mark_pattern) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/aria/cli/logger.py ---------------------------------------------------------------------- diff --git a/aria/cli/logger.py b/aria/cli/logger.py index 5de3701..96f3fb3 100644 --- a/aria/cli/logger.py +++ b/aria/cli/logger.py @@ -115,8 +115,8 @@ class Logging(object): class ModelLogIterator(object): - def __init__(self, model_storage, execution_id, filters=None, sort=None): - self._last_visited_id = 0 + def __init__(self, model_storage, execution_id, filters=None, sort=None, offset=0): + self._last_visited_id = offset self._model_storage = model_storage self._execution_id = execution_id self._additional_filters = filters or {} http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 17d2476..276b68e 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -68,7 +68,8 @@ class ExecutionBase(mixins.ModelMixin): VALID_TRANSITIONS = { PENDING: (STARTED, CANCELLED), STARTED: END_STATES + (CANCELLING,), - CANCELLING: END_STATES + CANCELLING: END_STATES, + CANCELLED: PENDING } @orm.validates('status') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index aa5a786..adcd635 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -97,10 +97,15 @@ class WorkflowContext(BaseContext): @property def _graph(self): + # Constructing a graph with only not ended nodes if self._execution_graph is None: graph = DiGraph() for task in self.execution.tasks: + if task.has_ended(): + continue for dependency in task.dependencies: + if dependency.has_ended(): + continue graph.add_edge(dependency, task) self._execution_graph = graph http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/aria/orchestrator/events.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py index a1c4922..aa1b5bc 100644 --- a/aria/orchestrator/events.py +++ b/aria/orchestrator/events.py @@ -34,3 +34,4 @@ on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal') on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal') on_success_workflow_signal = signal('on_success_workflow_signal') on_failure_workflow_signal = signal('on_failure_workflow_signal') +on_resume_workflow_signal = signal('on_resume_workflow_signal') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/aria/orchestrator/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py index 8d3dcc6..71b6401 100644 --- a/aria/orchestrator/exceptions.py +++ b/aria/orchestrator/exceptions.py @@ -74,3 +74,10 @@ class WorkflowImplementationNotFoundError(AriaError): Raised when attempting to import a workflow's code but the implementation is not found """ pass + + +class InvalidWorkflowRunnerParams(AriaError): + """ + Raised when invalid combination of arguments is passed to the workflow runner + """ + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 9e6b3ad..3ccb1ee 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -37,9 +37,9 @@ DEFAULT_TASK_RETRY_INTERVAL = 30 class WorkflowRunner(object): - def __init__(self, workflow_name, service_id, inputs, - model_storage, resource_storage, plugin_manager, - executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, + def __init__(self, model_storage, resource_storage, plugin_manager, + execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None, + task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): """ Manages a single workflow execution on a given service. @@ -55,28 +55,36 @@ class WorkflowRunner(object): :param task_retry_interval: Retry interval in between retry attempts of a failing task """ + if not (execution_id or (workflow_name and service_id)): + exceptions.InvalidWorkflowRunnerParams( + "Either provide execution id in order to resume a workflow or workflow name " + "and service id with inputs") + + self._is_resume = execution_id is not None + self._model_storage = model_storage self._resource_storage = resource_storage - self._workflow_name = workflow_name # the IDs are stored rather than the models themselves, so this module could be used # by several threads without raising errors on model objects shared between threads - self._service_id = service_id - - self._validate_workflow_exists_for_service() - workflow_fn = self._get_workflow_fn() - - execution = self._create_execution_model(inputs) - self._execution_id = execution.id + if self._is_resume: + self._execution_id = execution_id + self._service_id = self.execution.service.id + self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name + else: + self._service_id = service_id + self._workflow_name = workflow_name + self._validate_workflow_exists_for_service() + self._execution_id = self._create_execution_model(inputs).id self._workflow_context = WorkflowContext( name=self.__class__.__name__, model_storage=self._model_storage, resource_storage=resource_storage, service_id=service_id, - execution_id=execution.id, - workflow_name=workflow_name, + execution_id=self._execution_id, + workflow_name=self._workflow_name, task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval) @@ -86,9 +94,10 @@ class WorkflowRunner(object): # transforming the execution inputs to dict, to pass them to the workflow function execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values()) - self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) - compile.create_execution_tasks( - self._workflow_context, self._tasks_graph, executor.__class__) + if not self._is_resume: + workflow_fn = self._get_workflow_fn() + tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) + compile.create_execution_tasks(self._workflow_context, tasks_graph, executor.__class__) self._engine = engine.Engine(executors={executor.__class__: executor}) @@ -105,7 +114,7 @@ class WorkflowRunner(object): return self._model_storage.service.get(self._service_id) def execute(self): - self._engine.execute(ctx=self._workflow_context) + self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume) def cancel(self): self._engine.cancel_execution(ctx=self._workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 9f0ddd7..d5a6e70 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -41,11 +41,15 @@ class Engine(logger.LoggerMixin): self._executors = executors.copy() self._executors.setdefault(StubTaskExecutor, StubTaskExecutor()) - def execute(self, ctx): + def execute(self, ctx, resuming=False): """ execute the workflow """ executing_tasks = [] + + if resuming: + events.on_resume_workflow_signal.send(ctx) + try: events.start_workflow_signal.send(ctx) while True: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 2d71d2a..7380db8 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -121,6 +121,13 @@ def _workflow_cancelled(workflow_context, *args, **kwargs): execution.ended_at = datetime.utcnow() +@events.on_resume_workflow_signal.connect +def _workflow_resume(workflow_context, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + execution.status = execution.PENDING + + @events.on_cancelling_workflow_signal.connect def _workflow_cancelling(workflow_context, *args, **kwargs): with workflow_context.persist_changes: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/tests/mock/__init__.py ---------------------------------------------------------------------- diff --git a/tests/mock/__init__.py b/tests/mock/__init__.py index 9004b4c..9183b77 100644 --- a/tests/mock/__init__.py +++ b/tests/mock/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from . import models, context, topology, operations +from . import models, context, topology, operations, workflow http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index 7f6bbea..23a14bd 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -225,20 +225,24 @@ def create_interface_template(service_template, interface_name, operation_name, ) -def create_interface(service, interface_name, operation_name, operation_kwargs=None, - interface_kwargs=None): - the_type = service.service_template.interface_types.get_descendant('test_interface_type') - +def create_operation(operation_name, operation_kwargs=None): if operation_kwargs and operation_kwargs.get('arguments'): operation_kwargs['arguments'] = dict( (argument_name, models.Argument.wrap(argument_name, argument_value)) for argument_name, argument_value in operation_kwargs['arguments'].iteritems() if argument_value is not None) - operation = models.Operation( + return models.Operation( name=operation_name, **(operation_kwargs or {}) ) + + +def create_interface(service, interface_name, operation_name, operation_kwargs=None, + interface_kwargs=None): + the_type = service.service_template.interface_types.get_descendant('test_interface_type') + operation = create_operation(operation_name, operation_kwargs) + return models.Interface( type=the_type, operations=_dictify(operation), http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/tests/modeling/test_models.py ---------------------------------------------------------------------- diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py index 464f432..bbc7352 100644 --- a/tests/modeling/test_models.py +++ b/tests/modeling/test_models.py @@ -314,7 +314,7 @@ class TestExecution(object): Execution.CANCELLING], Execution.FAILED: [Execution.FAILED], Execution.SUCCEEDED: [Execution.SUCCEEDED], - Execution.CANCELLED: [Execution.CANCELLED] + Execution.CANCELLED: [Execution.CANCELLED, Execution.PENDING] } invalid_transitions = { @@ -334,8 +334,7 @@ class TestExecution(object): Execution.FAILED, Execution.CANCELLED, Execution.CANCELLING], - Execution.CANCELLED: [Execution.PENDING, - Execution.STARTED, + Execution.CANCELLED: [Execution.STARTED, Execution.FAILED, Execution.SUCCEEDED, Execution.CANCELLING], http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6236a476/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index 40f9035..ae82476 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -14,21 +14,31 @@ # limitations under the License. import json +from threading import Thread, Event from datetime import datetime -import pytest import mock +import pytest from aria.modeling import exceptions as modeling_exceptions from aria.modeling import models from aria.orchestrator import exceptions +from aria.orchestrator.events import on_cancelled_workflow_signal from aria.orchestrator.workflow_runner import WorkflowRunner from aria.orchestrator.workflows.executor.process import ProcessExecutor +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.executor import thread +from aria.orchestrator import ( + workflow, + operation, +) -from ..mock import ( - topology, - workflow as workflow_mocks +from tests import ( + mock as tests_mock, + storage ) + from ..fixtures import ( # pylint: disable=unused-import plugins_dir, plugin_manager, @@ -36,6 +46,16 @@ from ..fixtures import ( # pylint: disable=unused-import resource_storage as resource ) +events = { + 'is_resumed': Event(), + 'is_active': Event(), + 'execution_ended': Event() +} + + +class TimeoutError(BaseException): + pass + def test_undeclared_workflow(request): # validating a proper error is raised when the workflow is not declared in the service @@ -59,8 +79,8 @@ def test_builtin_workflow_instantiation(request): # validates the workflow runner instantiates properly when provided with a builtin workflow # (expecting no errors to be raised on undeclared workflow or missing workflow implementation) workflow_runner = _create_workflow_runner(request, 'install') - tasks = list(workflow_runner._tasks_graph.tasks) - assert len(tasks) == 2 # expecting two WorkflowTasks + tasks = list(workflow_runner.execution.tasks) + assert len(tasks) == 18 # expecting 18 tasks for 2 node topology def test_custom_workflow_instantiation(request): @@ -68,8 +88,8 @@ def test_custom_workflow_instantiation(request): # (expecting no errors to be raised on undeclared workflow or missing workflow implementation) mock_workflow = _setup_mock_workflow_in_service(request) workflow_runner = _create_workflow_runner(request, mock_workflow) - tasks = list(workflow_runner._tasks_graph.tasks) - assert len(tasks) == 0 # mock workflow creates no tasks + tasks = list(workflow_runner.execution.tasks) + assert len(tasks) == 2 # mock workflow creates only start workflow and end workflow task def test_existing_active_executions(request, service, model): @@ -139,7 +159,8 @@ def test_execute(request, service): assert engine_kwargs['ctx'].service.id == service.id assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow' - mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context) + mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context, + resuming=False) def test_cancel_execution(request): @@ -240,7 +261,7 @@ def test_workflow_function_parameters(request, tmpdir): @pytest.fixture def service(model): # sets up a service in the storage - service_id = topology.create_simple_topology_two_nodes(model) + service_id = tests_mock.topology.create_simple_topology_two_nodes(model) service = model.service.get(service_id) return service @@ -251,7 +272,7 @@ def _setup_mock_workflow_in_service(request, inputs=None): service = request.getfuncargvalue('service') resource = request.getfuncargvalue('resource') - source = workflow_mocks.__file__ + source = tests_mock.workflow.__file__ resource.service_template.upload(str(service.service_template.id), source) mock_workflow_name = 'test_workflow' arguments = {} @@ -293,3 +314,135 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None, resource_storage=resource, plugin_manager=plugin_manager, **task_configuration_kwargs) + + +class TestResumableWorkflows(object): + + def test_resume_workflow(self, workflow_context, executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_resuming_task) + + service = workflow_context.service + service.workflows['custom_workflow'] = tests_mock.models.create_operation( + 'custom_workflow', + operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)} + ) + workflow_context.model.service.update(service) + + wf_runner = WorkflowRunner( + service_id=workflow_context.service.id, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + workflow_name='custom_workflow', + executor=executor) + wf_thread = Thread(target=wf_runner.execute) + wf_thread.daemon = True + wf_thread.start() + + # Wait for the execution to start + if events['is_active'].wait(5) is False: + raise TimeoutError("is_active wasn't set to True") + wf_runner.cancel() + + if events['execution_ended'].wait(60) is False: + raise TimeoutError("Execution did not end") + + first_task, second_task = workflow_context.model.task.list(filters={'_stub_type': None}) + assert first_task.status == first_task.SUCCESS + assert second_task.status in (second_task.FAILED, second_task.RETRYING) + events['is_resumed'].set() + assert second_task.status in (second_task.FAILED, second_task.RETRYING) + + # Create a new workflow runner, with an existing execution id. This would cause + # the old execution to restart. + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=executor) + + new_wf_runner.execute() + + # Wait for it to finish and assert changes. + assert second_task.status == second_task.SUCCESS + assert node.attributes['invocations'].value == 3 + assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + + @staticmethod + @pytest.fixture + def executor(): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + @staticmethod + @pytest.fixture + def workflow_context(tmpdir): + workflow_context = tests_mock.context.simple(str(tmpdir)) + yield workflow_context + storage.release_sqlite_storage(workflow_context.model) + + @staticmethod + def _create_interface(ctx, node, func, arguments=None): + interface_name = 'aria.interfaces.lifecycle' + operation_kwargs = dict(function='{name}.{func.__name__}'.format( + name=__name__, func=func)) + if arguments: + # the operation has to declare the arguments before those may be passed + operation_kwargs['arguments'] = arguments + operation_name = 'create' + interface = tests_mock.models.create_interface(node.service, interface_name, operation_name, + operation_kwargs=operation_kwargs) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + return node, interface_name, operation_name + + @staticmethod + def _engine(workflow_func, workflow_context, executor): + graph = workflow_func(ctx=workflow_context) + execution = workflow_context.execution + compile.create_execution_tasks(execution, graph, executor.__class__) + workflow_context.execution = execution + + return engine.Engine(executors={executor.__class__: executor}) + + @pytest.fixture(autouse=True) + def register_to_events(self): + def execution_ended(*args, **kwargs): + events['execution_ended'].set() + + on_cancelled_workflow_signal.connect(execution_ended) + yield + on_cancelled_workflow_signal.disconnect(execution_ended) + + +@workflow +def mock_workflow(ctx, graph): + node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + graph.add_tasks( + api.task.OperationTask( + node, interface_name='aria.interfaces.lifecycle', operation_name='create'), + api.task.OperationTask( + node, interface_name='aria.interfaces.lifecycle', operation_name='create') + ) + + +@operation +def mock_resuming_task(ctx): + ctx.node.attributes['invocations'] += 1 + + if ctx.node.attributes['invocations'] != 1: + events['is_active'].set() + if not events['is_resumed'].isSet(): + # if resume was called, increase by one. o/w fail the execution - second task should + # fail as long it was not a part of resuming the workflow + raise BaseException("wasn't resumed yet")