ARIA-299 Resuming canceled execution with frozen task fails

Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b30a7edd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b30a7edd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b30a7edd

Branch: refs/heads/ARIA-103-remove-clint-dependency
Commit: b30a7edd8a56e21d54e93058b97b3d6162f82fc2
Parents: c46c94b
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jul 5 16:16:39 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Jul 10 15:20:33 2017 +0300

----------------------------------------------------------------------
 .../workflows/core/events_handler.py            |   4 +
 aria/orchestrator/workflows/executor/base.py    |   2 +-
 aria/orchestrator/workflows/executor/thread.py  |   8 +-
 tests/orchestrator/context/test_serialize.py    |   6 +-
 .../orchestrator/execution_plugin/test_local.py |   6 +-
 tests/orchestrator/execution_plugin/test_ssh.py |   6 +-
 tests/orchestrator/test_workflow_runner.py      | 206 ++++++++++++++++---
 .../workflows/executor/test_process_executor.py |  66 +++---
 ...process_executor_concurrent_modifications.py |   6 +-
 .../executor/test_process_executor_extension.py |   6 +-
 .../test_process_executor_tracked_changes.py    |   6 +-
 11 files changed, 251 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py 
b/aria/orchestrator/workflows/core/events_handler.py
index 769c1a8..37801de 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -123,6 +123,10 @@ def _workflow_resume(workflow_context, *args, **kwargs):
     with workflow_context.persist_changes:
         execution = workflow_context.execution
         execution.status = execution.PENDING
+        # Any non ended task would be put back to pending state
+        for task in execution.tasks:
+            if not task.has_ended():
+                task.status = task.PENDING
 
 
 @events.on_cancelling_workflow_signal.connect

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py 
b/aria/orchestrator/workflows/executor/base.py
index ec1a0c7..e7d03ea 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -49,7 +49,7 @@ class BaseExecutor(logger.LoggerMixin):
         """
         pass
 
-    def terminate(self, ctx):
+    def terminate(self, task_id):
         """
         Terminate the executing task
         :return:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py 
b/aria/orchestrator/workflows/executor/thread.py
index 26484dc..170620e 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -36,9 +36,10 @@ class ThreadExecutor(BaseExecutor):
     Note: This executor is incapable of running plugin operations.
     """
 
-    def __init__(self, pool_size=1, *args, **kwargs):
+    def __init__(self, pool_size=1, close_timeout=5, *args, **kwargs):
         super(ThreadExecutor, self).__init__(*args, **kwargs)
         self._stopped = False
+        self._close_timeout = close_timeout
         self._queue = Queue.Queue()
         self._pool = []
         for i in range(pool_size):
@@ -54,7 +55,10 @@ class ThreadExecutor(BaseExecutor):
     def close(self):
         self._stopped = True
         for thread in self._pool:
-            thread.join()
+            if self._close_timeout is None:
+                thread.join()
+            else:
+                thread.join(self._close_timeout)
 
     def _processor(self):
         while not self._stopped:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py 
b/tests/orchestrator/context/test_serialize.py
index 6046a16..091e23c 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -87,8 +87,10 @@ def _operation_mapping():
 @pytest.fixture
 def executor():
     result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
-    yield result
-    result.close()
+    try:
+        yield result
+    finally:
+        result.close()
 
 
 @pytest.fixture

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py 
b/tests/orchestrator/execution_plugin/test_local.py
index 5b94917..e64e998 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -509,8 +509,10 @@ if __name__ == '__main__':
     @pytest.fixture
     def executor(self):
         result = process.ProcessExecutor()
-        yield result
-        result.close()
+        try:
+            yield result
+        finally:
+            result.close()
 
     @pytest.fixture
     def workflow_context(self, tmpdir):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py 
b/tests/orchestrator/execution_plugin/test_ssh.py
index 4fa8184..a96c91d 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -277,8 +277,10 @@ class TestWithActualSSHServer(object):
     @pytest.fixture
     def executor(self):
         result = process.ProcessExecutor()
-        yield result
-        result.close()
+        try:
+            yield result
+        finally:
+            result.close()
 
     @pytest.fixture
     def workflow_context(self, tmpdir):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py 
b/tests/orchestrator/test_workflow_runner.py
index e640c7d..a77d727 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import json
+import time
 from threading import Thread, Event
 from datetime import datetime
 
@@ -23,7 +24,7 @@ import pytest
 from aria.modeling import exceptions as modeling_exceptions
 from aria.modeling import models
 from aria.orchestrator import exceptions
-from aria.orchestrator.events import on_cancelled_workflow_signal
+from aria.orchestrator import events
 from aria.orchestrator.workflow_runner import WorkflowRunner
 from aria.orchestrator.workflows.executor.process import ProcessExecutor
 from aria.orchestrator.workflows import api
@@ -46,9 +47,10 @@ from ..fixtures import (  # pylint: disable=unused-import
     resource_storage as resource
 )
 
-events = {
+custom_events = {
     'is_resumed': Event(),
     'is_active': Event(),
+    'execution_cancelled': Event(),
     'execution_ended': Event()
 }
 
@@ -57,6 +59,10 @@ class TimeoutError(BaseException):
     pass
 
 
+class FailingTask(BaseException):
+    pass
+
+
 def test_undeclared_workflow(request):
     # validating a proper error is raised when the workflow is not declared in 
the service
     with pytest.raises(exceptions.UndeclaredWorkflowError):
@@ -318,43 +324,57 @@ def _create_workflow_runner(request, workflow_name, 
inputs=None, executor=None,
 
 class TestResumableWorkflows(object):
 
-    def test_resume_workflow(self, workflow_context, executor):
-        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
-        self._create_interface(workflow_context, node, mock_resuming_task)
+    def _create_initial_workflow_runner(
+            self, workflow_context, workflow, executor, inputs=None):
 
         service = workflow_context.service
         service.workflows['custom_workflow'] = 
tests_mock.models.create_operation(
             'custom_workflow',
-            operation_kwargs={'function': '{0}.{1}'.format(__name__, 
mock_workflow.__name__)}
+            operation_kwargs={
+                'function': '{0}.{1}'.format(__name__, workflow.__name__),
+                'inputs': dict((k, models.Input.wrap(k, v)) for k, v in 
(inputs or {}).items())
+            }
         )
         workflow_context.model.service.update(service)
 
         wf_runner = WorkflowRunner(
             service_id=workflow_context.service.id,
-            inputs={},
+            inputs=inputs or {},
             model_storage=workflow_context.model,
             resource_storage=workflow_context.resource,
             plugin_manager=None,
             workflow_name='custom_workflow',
             executor=executor)
+        return wf_runner
+
+    @staticmethod
+    def _wait_for_active_and_cancel(workflow_runner):
+        if custom_events['is_active'].wait(60) is False:
+            raise TimeoutError("is_active wasn't set to True")
+        workflow_runner.cancel()
+        if custom_events['execution_cancelled'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+    def test_resume_workflow(self, workflow_context, thread_executor):
+        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
+        self._create_interface(workflow_context, node, mock_resuming_task)
+
+        wf_runner = self._create_initial_workflow_runner(
+            workflow_context, mock_parallel_workflow, thread_executor)
+
         wf_thread = Thread(target=wf_runner.execute)
         wf_thread.daemon = True
         wf_thread.start()
 
         # Wait for the execution to start
-        if events['is_active'].wait(5) is False:
-            raise TimeoutError("is_active wasn't set to True")
-        wf_runner.cancel()
-
-        if events['execution_ended'].wait(60) is False:
-            raise TimeoutError("Execution did not end")
+        self._wait_for_active_and_cancel(wf_runner)
 
         tasks = workflow_context.model.task.list(filters={'_stub_type': None})
         assert any(task.status == task.SUCCESS for task in tasks)
-        assert any(task.status in (task.FAILED, task.RETRYING) for task in 
tasks)
-        events['is_resumed'].set()
-        assert any(task.status in (task.FAILED, task.RETRYING) for task in 
tasks)
+        assert any(task.status == task.RETRYING for task in tasks)
+        custom_events['is_resumed'].set()
+        assert any(task.status == task.RETRYING for task in tasks)
 
         # Create a new workflow runner, with an existing execution id. This 
would cause
         # the old execution to restart.
@@ -365,7 +385,7 @@ class TestResumableWorkflows(object):
             resource_storage=workflow_context.resource,
             plugin_manager=None,
             execution_id=wf_runner.execution.id,
-            executor=executor)
+            executor=thread_executor)
 
         new_wf_runner.execute()
 
@@ -374,9 +394,93 @@ class TestResumableWorkflows(object):
         assert node.attributes['invocations'].value == 3
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
 
+    def test_resume_started_task(self, workflow_context, thread_executor):
+        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
+        self._create_interface(workflow_context, node, mock_stuck_task)
+
+        wf_runner = self._create_initial_workflow_runner(
+            workflow_context, mock_single_task_workflow, thread_executor)
+
+        wf_thread = Thread(target=wf_runner.execute)
+        wf_thread.daemon = True
+        wf_thread.start()
+
+        self._wait_for_active_and_cancel(wf_runner)
+        task = workflow_context.model.task.list(filters={'_stub_type': 
None})[0]
+        assert node.attributes['invocations'].value == 1
+        assert task.status == task.STARTED
+        assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
+                                              wf_runner.execution.CANCELLING)
+        custom_events['is_resumed'].set()
+
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = WorkflowRunner(
+                service_id=wf_runner.service.id,
+                inputs={},
+                model_storage=workflow_context.model,
+                resource_storage=workflow_context.resource,
+                plugin_manager=None,
+                execution_id=wf_runner.execution.id,
+                executor=new_thread_executor)
+
+            new_wf_runner.execute()
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        assert node.attributes['invocations'].value == 2
+        assert task.status == task.SUCCESS
+        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
+    def test_resume_failed_task(self, workflow_context, thread_executor):
+        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
+        self._create_interface(workflow_context, node, 
mock_failed_before_resuming)
+
+        wf_runner = self._create_initial_workflow_runner(
+            workflow_context, mock_single_task_workflow, thread_executor)
+        wf_thread = Thread(target=wf_runner.execute)
+        wf_thread.setDaemon(True)
+        wf_thread.start()
+
+        self._wait_for_active_and_cancel(wf_runner)
+
+        task = workflow_context.model.task.list(filters={'_stub_type': 
None})[0]
+        assert node.attributes['invocations'].value == 2
+        assert task.status == task.STARTED
+        assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
+                                              wf_runner.execution.CANCELLING)
+
+        custom_events['is_resumed'].set()
+        assert node.attributes['invocations'].value == 2
+
+        # Create a new workflow runner, with an existing execution id. This 
would cause
+        # the old execution to restart.
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = WorkflowRunner(
+                service_id=wf_runner.service.id,
+                inputs={},
+                model_storage=workflow_context.model,
+                resource_storage=workflow_context.resource,
+                plugin_manager=None,
+                execution_id=wf_runner.execution.id,
+                executor=new_thread_executor)
+
+            new_wf_runner.execute()
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        assert node.attributes['invocations'].value == task.max_attempts - 1
+        assert task.status == task.SUCCESS
+        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
     @staticmethod
     @pytest.fixture
-    def executor():
+    def thread_executor():
         result = thread.ThreadExecutor()
         try:
             yield result
@@ -417,16 +521,23 @@ class TestResumableWorkflows(object):
 
     @pytest.fixture(autouse=True)
     def register_to_events(self):
+        def execution_cancelled(*args, **kwargs):
+            custom_events['execution_cancelled'].set()
+
         def execution_ended(*args, **kwargs):
-            events['execution_ended'].set()
+            custom_events['execution_ended'].set()
 
-        on_cancelled_workflow_signal.connect(execution_ended)
+        events.on_cancelled_workflow_signal.connect(execution_cancelled)
+        events.on_failure_workflow_signal.connect(execution_ended)
         yield
-        on_cancelled_workflow_signal.disconnect(execution_ended)
+        events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
+        events.on_failure_workflow_signal.disconnect(execution_ended)
+        for event in custom_events.values():
+            event.clear()
 
 
 @workflow
-def mock_workflow(ctx, graph):
+def mock_parallel_workflow(ctx, graph):
     node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
     graph.add_tasks(
         api.task.OperationTask(
@@ -441,8 +552,51 @@ def mock_resuming_task(ctx):
     ctx.node.attributes['invocations'] += 1
 
     if ctx.node.attributes['invocations'] != 1:
-        events['is_active'].set()
-        if not events['is_resumed'].isSet():
+        custom_events['is_active'].set()
+        if not custom_events['is_resumed'].isSet():
             # if resume was called, increase by one. o/w fail the execution - 
second task should
             # fail as long it was not a part of resuming the workflow
-            raise BaseException("wasn't resumed yet")
+            raise FailingTask("wasn't resumed yet")
+
+
+@workflow
+def mock_single_task_workflow(ctx, graph):
+    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+    graph.add_tasks(
+        api.task.OperationTask(node,
+                               interface_name='aria.interfaces.lifecycle',
+                               operation_name='create',
+                               retry_interval=1,
+                               max_attempts=10),
+    )
+
+
+@operation
+def mock_failed_before_resuming(ctx):
+    """
+    The task should run atmost ctx.task.max_attempts - 1 times, and only then 
pass.
+    overall, the number of invocations should be ctx.task.max_attempts - 1
+    """
+    ctx.node.attributes['invocations'] += 1
+
+    if ctx.node.attributes['invocations'] == 2:
+        custom_events['is_active'].set()
+        # unfreeze the thread only when all of the invocations are done
+        while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
+            time.sleep(5)
+
+    elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
+        # pass only just before the end.
+        return
+    else:
+        # fail o.w.
+        raise FailingTask("stop this task")
+
+
+@operation
+def mock_stuck_task(ctx):
+    ctx.node.attributes['invocations'] += 1
+    while not custom_events['is_resumed'].isSet():
+        if not custom_events['is_active'].isSet():
+            custom_events['is_active'].set()
+        time.sleep(5)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py 
b/tests/orchestrator/workflows/executor/test_process_executor.py
index f8fc567..e050d18 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -43,36 +43,25 @@ from . import MockContext
 
 class TestProcessExecutor(object):
 
-    def test_plugin_execution(self, executor, mock_plugin, model):
+    def test_plugin_execution(self, executor, mock_plugin, model, queue):
         ctx = MockContext(
             model,
             task_kwargs=dict(function='mock_plugin1.operation', 
plugin_fk=mock_plugin.id)
         )
 
-        queue = Queue.Queue()
-
-        def handler(_, exception=None, **kwargs):
-            queue.put(exception)
-
-        events.on_success_task_signal.connect(handler)
-        events.on_failure_task_signal.connect(handler)
-        try:
-            executor.execute(ctx)
-            error = queue.get(timeout=60)
-            # tests/resources/plugins/mock-plugin1 is the plugin installed
-            # during this tests setup. The module mock_plugin1 contains a 
single
-            # operation named "operation" which calls an entry point defined 
in the plugin's
-            # setup.py. This entry points simply prints 'mock-plugin-output' 
to stdout.
-            # The "operation" operation that called this subprocess, then 
raises a RuntimeError
-            # with that subprocess output as the error message.
-            # This is what we assert here. This tests checks that both the 
PYTHONPATH (operation)
-            # and PATH (entry point) are properly updated in the subprocess in 
which the task is
-            # running.
-            assert isinstance(error, RuntimeError)
-            assert error.message == 'mock-plugin-output'
-        finally:
-            events.on_success_task_signal.disconnect(handler)
-            events.on_failure_task_signal.disconnect(handler)
+        executor.execute(ctx)
+        error = queue.get(timeout=60)
+        # tests/resources/plugins/mock-plugin1 is the plugin installed
+        # during this tests setup. The module mock_plugin1 contains a single
+        # operation named "operation" which calls an entry point defined in 
the plugin's
+        # setup.py. This entry points simply prints 'mock-plugin-output' to 
stdout.
+        # The "operation" operation that called this subprocess, then raises a 
RuntimeError
+        # with that subprocess output as the error message.
+        # This is what we assert here. This tests checks that both the 
PYTHONPATH (operation)
+        # and PATH (entry point) are properly updated in the subprocess in 
which the task is
+        # running.
+        assert isinstance(error, RuntimeError)
+        assert error.message == 'mock-plugin-output'
 
     def test_closed(self, executor, model):
         executor.close()
@@ -127,6 +116,23 @@ while True:
                 # making the test more readable
                 assert pid not in psutil.pids()
 
+
+@pytest.fixture
+def queue():
+    _queue = Queue.Queue()
+
+    def handler(_, exception=None, **kwargs):
+        _queue.put(exception)
+
+    events.on_success_task_signal.connect(handler)
+    events.on_failure_task_signal.connect(handler)
+    try:
+        yield _queue
+    finally:
+        events.on_success_task_signal.disconnect(handler)
+        events.on_failure_task_signal.disconnect(handler)
+
+
 @pytest.fixture
 def fs_test_holder(tmpdir):
     dataholder_path = str(tmpdir.join('dataholder'))
@@ -136,11 +142,11 @@ def fs_test_holder(tmpdir):
 
 @pytest.fixture
 def executor(plugin_manager):
-    result = process.ProcessExecutor(
-        plugin_manager=plugin_manager,
-        python_path=[tests.ROOT_DIR])
-    yield result
-    result.close()
+    result = process.ProcessExecutor(plugin_manager=plugin_manager, 
python_path=[tests.ROOT_DIR])
+    try:
+        yield result
+    finally:
+        result.close()
 
 
 @pytest.fixture

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
 
b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
index 6163c09..86a2edf 100644
--- 
a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
+++ 
b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -120,8 +120,10 @@ def _test(context, executor, lock_files, func, dataholder, 
expected_failure):
 @pytest.fixture
 def executor():
     result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
-    yield result
-    result.close()
+    try:
+        yield result
+    finally:
+        result.close()
 
 
 @pytest.fixture

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/executor/test_process_executor_extension.py 
b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 6ed3e2b..b26fa43 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -86,8 +86,10 @@ def _mock_operation(ctx, **operation_arguments):
 @pytest.fixture
 def executor():
     result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
-    yield result
-    result.close()
+    try:
+        yield result
+    finally:
+        result.close()
 
 
 @pytest.fixture

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b30a7edd/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
 
b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index a74a473..47ee2f7 100644
--- 
a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ 
b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -155,8 +155,10 @@ def _operation_mapping(func):
 @pytest.fixture
 def executor():
     result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
-    yield result
-    result.close()
+    try:
+        yield result
+    finally:
+        result.close()
 
 
 @pytest.fixture

Reply via email to