Properly handle side input exception when all reader threads complete
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6779b8ec Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6779b8ec Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6779b8ec Branch: refs/heads/tez-runner Commit: 6779b8ec0e872de86ed13fdfc9b260f69f44dfab Parents: a0eb00e Author: Charles Chen <c...@google.com> Authored: Fri Nov 10 11:28:43 2017 -0800 Committer: chamik...@google.com <chamik...@google.com> Committed: Fri Nov 10 15:21:15 2017 -0800 ---------------------------------------------------------------------- .../apache_beam/runners/worker/sideinputs.py | 3 +++ .../apache_beam/runners/worker/sideinputs_test.py | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6779b8ec/sdks/python/apache_beam/runners/worker/sideinputs.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py index bdf9f4e..c91fe95 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs.py @@ -116,6 +116,7 @@ class PrefetchingSourceSetIterable(object): self.element_queue.put(READER_THREAD_IS_DONE_SENTINEL) def __iter__(self): + # pylint: disable=too-many-nested-blocks if self.already_iterated: raise RuntimeError( 'Can only iterate once over PrefetchingSourceSetIterable instance.') @@ -132,6 +133,8 @@ class PrefetchingSourceSetIterable(object): if element is READER_THREAD_IS_DONE_SENTINEL: num_readers_finished += 1 if num_readers_finished == self.num_reader_threads: + if self.has_errored: + raise self.reader_exceptions.get() return elif self.has_errored: raise self.reader_exceptions.get() http://git-wip-us.apache.org/repos/asf/beam/blob/6779b8ec/sdks/python/apache_beam/runners/worker/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py index d243bbe..73d34fb 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py @@ -91,6 +91,24 @@ class PrefetchingSourceIteratorTest(unittest.TestCase): sources, max_reader_threads=1) assert list(strip_windows(iterator_fn())) == range(11) + def test_source_iterator_single_source_exception(self): + class MyException(Exception): + pass + + def exception_generator(): + yield 0 + raise MyException('I am an exception!') + + sources = [ + FakeSource(exception_generator()), + ] + iterator_fn = sideinputs.get_iterator_fn_for_sources(sources) + seen = set() + with self.assertRaises(MyException): + for value in iterator_fn(): + seen.add(value.value) + self.assertEqual(sorted(seen), [0]) + def test_source_iterator_fn_exception(self): class MyException(Exception): pass