Attach original traceback in direct runner failure.

This will make it much easier to debug pipelines.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5fba260d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5fba260d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5fba260d

Branch: refs/heads/master
Commit: 5fba260d89040b4325c872c2764a3981ad189df8
Parents: effca63
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Thu Feb 9 17:28:32 2017 -0800
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Fri Feb 10 13:22:02 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/direct/executor.py      | 24 ++++++++++++++------
 1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5fba260d/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py 
b/sdks/python/apache_beam/runners/direct/executor.py
index 2d4a8bd..ff0d372 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -22,6 +22,7 @@ from __future__ import absolute_import
 import collections
 import logging
 import Queue
+import sys
 import threading
 import traceback
 from weakref import WeakValueDictionary
@@ -364,7 +365,8 @@ class _ExecutorServiceParallelExecutor(object):
     update = self.visible_updates.take()
     try:
       if update.exception:
-        raise update.exception
+        t, v, tb = update.exc_info
+        raise t, v, tb
     finally:
       self.executor_service.shutdown()
 
@@ -426,6 +428,10 @@ class _ExecutorServiceParallelExecutor(object):
       assert bool(produced_bundle) != bool(exception)
       self.committed_bundle = produced_bundle
       self.exception = exception
+      self.exc_info = sys.exc_info()
+      if self.exc_info[1] is not exception:
+        # Not the right exception.
+        self.exc_info = (exception, None, None)
 
   class VisibleExecutorUpdate(object):
     """An update of interest to the user.
@@ -434,9 +440,10 @@ class _ExecutorServiceParallelExecutor(object):
     raise an exception.
     """
 
-    def __init__(self, exception=None):
-      self.finished = exception is not None
-      self.exception = exception
+    def __init__(self, exc_info=(None, None, None)):
+      self.finished = exc_info[0] is not None
+      self.exception = exc_info[1] or exc_info[0]
+      self.exc_info = exc_info
 
   class _MonitorTask(ExecutorService.CallableTask):
     """MonitorTask continuously runs to ensure that pipeline makes progress."""
@@ -460,7 +467,7 @@ class _ExecutorServiceParallelExecutor(object):
                             update.exception)
             self._executor.visible_updates.offer(
                 _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
-                    update.exception))
+                    update.exc_info))
           update = self._executor.all_updates.poll()
         self._executor.evaluation_context.schedule_pending_unblocked_tasks(
             self._executor.executor_service)
@@ -468,7 +475,8 @@ class _ExecutorServiceParallelExecutor(object):
       except Exception as e:  # pylint: disable=broad-except
         logging.error('Monitor task died due to exception.\n %s', e)
         self._executor.visible_updates.offer(
-            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(e))
+            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
+                sys.exc_info()))
       finally:
         if not self._should_shutdown():
           self._executor.executor_service.submit(self)
@@ -502,7 +510,9 @@ class _ExecutorServiceParallelExecutor(object):
           # Nothing is scheduled for execution, but watermarks incomplete.
           self._executor.visible_updates.offer(
               _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
-                  Exception('Monitor task detected a pipeline stall.')))
+                  (Exception('Monitor task detected a pipeline stall.'),
+                   None,
+                   None)))
         self._executor.executor_service.shutdown()
         return True
 

Reply via email to