Repository: beam Updated Branches: refs/heads/master c26d5827b -> cd6802bec
[BEAM-1450] Fix NewDoFn handling of window explosion. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/37b6fb1a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/37b6fb1a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/37b6fb1a Branch: refs/heads/master Commit: 37b6fb1a1c65ff53cef242321eb1ef4ddf48e022 Parents: c26d582 Author: Robert Bradshaw <rober...@gmail.com> Authored: Thu Feb 9 12:18:48 2017 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Feb 9 15:06:28 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline_test.py | 7 +++++++ sdks/python/apache_beam/runners/common.pxd | 3 +-- sdks/python/apache_beam/runners/common.py | 17 +++++++++++------ 3 files changed, 19 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/37b6fb1a/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 90b1a54..2f188aa 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -327,6 +327,13 @@ class DoFnTest(unittest.TestCase): | ParDo(TestDoFn())) assert_that(pcoll, equal_to([(1, (-5, 5)), (1, (0, 10)), (7, (0, 10)), (7, (5, 15))])) + pcoll2 = pcoll | 'Again' >> ParDo(TestDoFn()) + assert_that( + pcoll2, + equal_to([ + ((1, (-5, 5)), (-5, 5)), ((1, (0, 10)), (0, 10)), + ((7, (0, 10)), (0, 10)), ((7, (5, 15)), (5, 15))]), + label='doubled windows') pipeline.run() def test_timestamp_param(self): http://git-wip-us.apache.org/repos/asf/beam/blob/37b6fb1a/sdks/python/apache_beam/runners/common.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index f36fdd0..781d96b 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -50,8 +50,7 @@ cdef class DoFnRunner(Receiver): cpdef process(self, WindowedValue element) cdef _dofn_invoker(self, WindowedValue element) cdef _dofn_simple_invoker(self, WindowedValue element) - cdef _dofn_window_invoker( - self, WindowedValue element, list args, dict kwargs, object window) + cdef _dofn_per_window_invoker(self, WindowedValue element) @cython.locals(windowed_value=WindowedValue) cpdef _process_outputs(self, WindowedValue element, results) http://git-wip-us.apache.org/repos/asf/beam/blob/37b6fb1a/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 50ccf22..6f86ca0 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -197,7 +197,13 @@ class DoFnRunner(Receiver): def _dofn_simple_invoker(self, element): self._process_outputs(element, self.dofn_process(element.value)) - def _dofn_window_invoker(self, element, args, kwargs, window): + def _dofn_per_window_invoker(self, element): + if self.has_windowed_inputs: + window, = element.windows + args, kwargs = util.insert_values_in_args( + self.args, self.kwargs, [si[window] for si in self.side_inputs]) + else: + args, kwargs = self.args, self.kwargs # TODO(sourabhbajaj): Investigate why we can't use `is` instead of == for i, p in self.placeholders: if p == core.DoFn.ElementParam: @@ -218,13 +224,12 @@ class DoFnRunner(Receiver): # Call for the process function for each window if has windowed side inputs # or if the process accesses the window parameter. We can just call it once # otherwise as none of the arguments are changing - if self.has_windowed_inputs: + if self.has_windowed_inputs and len(element.windows) > 1: for w in element.windows: - args, kwargs = util.insert_values_in_args( - self.args, self.kwargs, [si[w] for si in self.side_inputs]) - self._dofn_window_invoker(element, args, kwargs, w) + self._dofn_per_window_invoker( + WindowedValue(element.value, element.timestamp, (w,))) else: - self._dofn_window_invoker(element, self.args, self.kwargs, None) + self._dofn_per_window_invoker(element) def _invoke_bundle_method(self, method): try: