Repository: beam
Updated Branches:
  refs/heads/master c26d5827b -> cd6802bec


[BEAM-1450] Fix NewDoFn handling of window explosion.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/37b6fb1a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/37b6fb1a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/37b6fb1a

Branch: refs/heads/master
Commit: 37b6fb1a1c65ff53cef242321eb1ef4ddf48e022
Parents: c26d582
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Thu Feb 9 12:18:48 2017 -0800
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Thu Feb 9 15:06:28 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline_test.py   |  7 +++++++
 sdks/python/apache_beam/runners/common.pxd |  3 +--
 sdks/python/apache_beam/runners/common.py  | 17 +++++++++++------
 3 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/37b6fb1a/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 90b1a54..2f188aa 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -327,6 +327,13 @@ class DoFnTest(unittest.TestCase):
              | ParDo(TestDoFn()))
     assert_that(pcoll, equal_to([(1, (-5, 5)), (1, (0, 10)),
                                  (7, (0, 10)), (7, (5, 15))]))
+    pcoll2 = pcoll | 'Again' >> ParDo(TestDoFn())
+    assert_that(
+        pcoll2,
+        equal_to([
+            ((1, (-5, 5)), (-5, 5)), ((1, (0, 10)), (0, 10)),
+            ((7, (0, 10)), (0, 10)), ((7, (5, 15)), (5, 15))]),
+        label='doubled windows')
     pipeline.run()
 
   def test_timestamp_param(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/37b6fb1a/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index f36fdd0..781d96b 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -50,8 +50,7 @@ cdef class DoFnRunner(Receiver):
   cpdef process(self, WindowedValue element)
   cdef _dofn_invoker(self, WindowedValue element)
   cdef _dofn_simple_invoker(self, WindowedValue element)
-  cdef _dofn_window_invoker(
-      self, WindowedValue element, list args, dict kwargs, object window)
+  cdef _dofn_per_window_invoker(self, WindowedValue element)
 
   @cython.locals(windowed_value=WindowedValue)
   cpdef _process_outputs(self, WindowedValue element, results)

http://git-wip-us.apache.org/repos/asf/beam/blob/37b6fb1a/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 50ccf22..6f86ca0 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -197,7 +197,13 @@ class DoFnRunner(Receiver):
   def _dofn_simple_invoker(self, element):
     self._process_outputs(element, self.dofn_process(element.value))
 
-  def _dofn_window_invoker(self, element, args, kwargs, window):
+  def _dofn_per_window_invoker(self, element):
+    if self.has_windowed_inputs:
+      window, = element.windows
+      args, kwargs = util.insert_values_in_args(
+          self.args, self.kwargs, [si[window] for si in self.side_inputs])
+    else:
+      args, kwargs = self.args, self.kwargs
     # TODO(sourabhbajaj): Investigate why we can't use `is` instead of ==
     for i, p in self.placeholders:
       if p == core.DoFn.ElementParam:
@@ -218,13 +224,12 @@ class DoFnRunner(Receiver):
     # Call for the process function for each window if has windowed side inputs
     # or if the process accesses the window parameter. We can just call it once
     # otherwise as none of the arguments are changing
-    if self.has_windowed_inputs:
+    if self.has_windowed_inputs and len(element.windows) > 1:
       for w in element.windows:
-        args, kwargs = util.insert_values_in_args(
-            self.args, self.kwargs, [si[w] for si in self.side_inputs])
-        self._dofn_window_invoker(element, args, kwargs, w)
+        self._dofn_per_window_invoker(
+            WindowedValue(element.value, element.timestamp, (w,)))
     else:
-      self._dofn_window_invoker(element, self.args, self.kwargs, None)
+      self._dofn_per_window_invoker(element)
 
   def _invoke_bundle_method(self, method):
     try:

Reply via email to