Attach original traceback in direct runner failure. This will make it much easier to debug pipelines.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5fba260d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5fba260d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5fba260d Branch: refs/heads/master Commit: 5fba260d89040b4325c872c2764a3981ad189df8 Parents: effca63 Author: Robert Bradshaw <rober...@gmail.com> Authored: Thu Feb 9 17:28:32 2017 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Fri Feb 10 13:22:02 2017 -0800 ---------------------------------------------------------------------- .../apache_beam/runners/direct/executor.py | 24 ++++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5fba260d/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 2d4a8bd..ff0d372 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -22,6 +22,7 @@ from __future__ import absolute_import import collections import logging import Queue +import sys import threading import traceback from weakref import WeakValueDictionary @@ -364,7 +365,8 @@ class _ExecutorServiceParallelExecutor(object): update = self.visible_updates.take() try: if update.exception: - raise update.exception + t, v, tb = update.exc_info + raise t, v, tb finally: self.executor_service.shutdown() @@ -426,6 +428,10 @@ class _ExecutorServiceParallelExecutor(object): assert bool(produced_bundle) != bool(exception) self.committed_bundle = produced_bundle self.exception = exception + self.exc_info = sys.exc_info() + if self.exc_info[1] is not exception: + # Not the right exception. + self.exc_info = (exception, None, None) class VisibleExecutorUpdate(object): """An update of interest to the user. @@ -434,9 +440,10 @@ class _ExecutorServiceParallelExecutor(object): raise an exception. """ - def __init__(self, exception=None): - self.finished = exception is not None - self.exception = exception + def __init__(self, exc_info=(None, None, None)): + self.finished = exc_info[0] is not None + self.exception = exc_info[1] or exc_info[0] + self.exc_info = exc_info class _MonitorTask(ExecutorService.CallableTask): """MonitorTask continuously runs to ensure that pipeline makes progress.""" @@ -460,7 +467,7 @@ class _ExecutorServiceParallelExecutor(object): update.exception) self._executor.visible_updates.offer( _ExecutorServiceParallelExecutor.VisibleExecutorUpdate( - update.exception)) + update.exc_info)) update = self._executor.all_updates.poll() self._executor.evaluation_context.schedule_pending_unblocked_tasks( self._executor.executor_service) @@ -468,7 +475,8 @@ class _ExecutorServiceParallelExecutor(object): except Exception as e: # pylint: disable=broad-except logging.error('Monitor task died due to exception.\n %s', e) self._executor.visible_updates.offer( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(e)) + _ExecutorServiceParallelExecutor.VisibleExecutorUpdate( + sys.exc_info())) finally: if not self._should_shutdown(): self._executor.executor_service.submit(self) @@ -502,7 +510,9 @@ class _ExecutorServiceParallelExecutor(object): # Nothing is scheduled for execution, but watermarks incomplete. self._executor.visible_updates.offer( _ExecutorServiceParallelExecutor.VisibleExecutorUpdate( - Exception('Monitor task detected a pipeline stall.'))) + (Exception('Monitor task detected a pipeline stall.'), + None, + None))) self._executor.executor_service.shutdown() return True