Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-139-attributes 936feaf1f -> d06a01657 (forced update)
ARIA-156 Better handle exceptions in the process executor Previously, if an exception was raised during the starting of a task, the task's process was permanently blocked on receiving a message. The reason was that the exception caused the 'listener thread' to not send a response to the task's process, as the exception was not handled inside the 'with' block of the listener thread. The first change I introduced was to wrap the yielding of the message and the response inside a try-except-finally block, so the exception will be handled within the 'with' scope, and to ensure a response is sent to the task's process. The second change is to move the sending of the 'task started' message in the task's process to a place where encountering an exception will be handled via sending a 'task failed' message back to the listener thread. Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/1cb3086f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/1cb3086f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/1cb3086f Branch: refs/heads/ARIA-139-attributes Commit: 1cb3086f38bc35a6a5e588aa5e340cb4fa5cacf5 Parents: 8553977 Author: Avia Efrat <a...@gigaspaces.com> Authored: Sun May 7 11:42:58 2017 +0300 Committer: Avia Efrat <a...@gigaspaces.com> Committed: Sun May 7 14:44:01 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/process.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cb3086f/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 2378e0a..8481406 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -213,8 +213,13 @@ class ProcessExecutor(base.BaseExecutor): with contextlib.closing(self._server_socket.accept()[0]) as connection: message = _recv_message(connection) response = {} - yield message, response - _send_message(connection, response) + try: + yield message, response + except BaseException as e: + response['exception'] = exceptions.wrap_if_needed(e) + raise + finally: + _send_message(connection, response) def _handle_task_started_request(self, task_id, **kwargs): self._task_started(self._tasks[task_id]) @@ -378,7 +383,6 @@ def _main(): task_id = arguments['task_id'] port = arguments['port'] messenger = _Messenger(task_id=task_id, port=port) - messenger.started() implementation = arguments['implementation'] operation_inputs = arguments['operation_inputs'] @@ -390,6 +394,7 @@ def _main(): with instrumentation.track_changes() as instrument: try: + messenger.started() ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) _patch_session(ctx=ctx, messenger=messenger, instrument=instrument) task_func = imports.load_attribute(implementation)