ARIA-156 Better handle exceptions in the process executor

Previously, if an exception was raised during the starting of a task,
the task's process was permanently blocked on receiving a message.

The reason was that the exception caused the 'listener thread' to
not send a response to the task's process, as the exception was not
handled inside the 'with' block of the listener thread.

The first change I introduced was to wrap the yielding of the message and
the response inside a try-except-finally block, so the exception will be
handled within the 'with' scope, and to ensure a response is sent to the
task's process.

The second change is to move the sending of the 'task started' message in
the task's process to a place where encountering an exception will be
handled via sending a 'task failed' message back to the listener thread.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/1cb3086f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/1cb3086f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/1cb3086f

Branch: refs/heads/ARIA-148-extra-cli-commands
Commit: 1cb3086f38bc35a6a5e588aa5e340cb4fa5cacf5
Parents: 8553977
Author: Avia Efrat <a...@gigaspaces.com>
Authored: Sun May 7 11:42:58 2017 +0300
Committer: Avia Efrat <a...@gigaspaces.com>
Committed: Sun May 7 14:44:01 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cb3086f/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 2378e0a..8481406 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -213,8 +213,13 @@ class ProcessExecutor(base.BaseExecutor):
         with contextlib.closing(self._server_socket.accept()[0]) as connection:
             message = _recv_message(connection)
             response = {}
-            yield message, response
-            _send_message(connection, response)
+            try:
+                yield message, response
+            except BaseException as e:
+                response['exception'] = exceptions.wrap_if_needed(e)
+                raise
+            finally:
+                _send_message(connection, response)
 
     def _handle_task_started_request(self, task_id, **kwargs):
         self._task_started(self._tasks[task_id])
@@ -378,7 +383,6 @@ def _main():
     task_id = arguments['task_id']
     port = arguments['port']
     messenger = _Messenger(task_id=task_id, port=port)
-    messenger.started()
 
     implementation = arguments['implementation']
     operation_inputs = arguments['operation_inputs']
@@ -390,6 +394,7 @@ def _main():
 
     with instrumentation.track_changes() as instrument:
         try:
+            messenger.started()
             ctx = 
context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
             _patch_session(ctx=ctx, messenger=messenger, instrument=instrument)
             task_func = imports.load_attribute(implementation)

Reply via email to