This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f22231c  [BEAM-7972] Always use Global window in reshuffle and then 
apply window again.
     new d8c1146  Merge pull request #9334 from angoenka/fix_reshuffle
f22231c is described below

commit f22231c8a650db16524548e063f2c3e26fc48f4c
Author: Ankur Goenka <ankurgoe...@gmail.com>
AuthorDate: Tue Aug 13 15:59:48 2019 -0700

    [BEAM-7972] Always use Global window in reshuffle and then apply window 
again.
---
 sdks/python/apache_beam/transforms/util.py | 32 ++++++++++++------------------
 1 file changed, 13 insertions(+), 19 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index c2866d6..801f522 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -594,7 +594,6 @@ class ReshufflePerKey(PTransform):
       # In this (common) case we can use a trivial trigger driver
       # and avoid the (expensive) window param.
       globally_windowed = window.GlobalWindows.windowed_value(None)
-      window_fn = window.GlobalWindows()
       MIN_TIMESTAMP = window.MIN_TIMESTAMP
 
       def reify_timestamps(element, timestamp=DoFn.TimestampParam):
@@ -612,29 +611,24 @@ class ReshufflePerKey(PTransform):
             for (value, timestamp) in values]
 
     else:
-      # The linter is confused.
-      # hash(1) is used to force "runtime" selection of _IdentityWindowFn
-      # pylint: disable=abstract-class-instantiated
-      cls = hash(1) and _IdentityWindowFn
-      window_fn = cls(
-          windowing_saved.windowfn.get_window_coder())
-
-      def reify_timestamps(element, timestamp=DoFn.TimestampParam):
+      def reify_timestamps(element,
+                           timestamp=DoFn.TimestampParam,
+                           window=DoFn.WindowParam):
         key, value = element
-        return key, TimestampedValue(value, timestamp)
+        # Transport the window as part of the value and restore it later.
+        return key, windowed_value.WindowedValue(value, timestamp, [window])
 
-      def restore_timestamps(element, window=DoFn.WindowParam):
-        # Pass the current window since _IdentityWindowFn wouldn't know how
-        # to generate it.
-        key, values = element
-        return [
-            windowed_value.WindowedValue(
-                (key, value.value), value.timestamp, [window])
-            for value in values]
+      def restore_timestamps(element):
+        key, windowed_values = element
+        return [wv.with_value((key, wv.value)) for wv in windowed_values]
 
     ungrouped = pcoll | Map(reify_timestamps)
+
+    # TODO(BEAM-8104) Using global window as one of the standard window.
+    # This is to mitigate the Dataflow Java Runner Harness limitation to
+    # accept only standard coders.
     ungrouped._windowing = Windowing(
-        window_fn,
+        window.GlobalWindows(),
         triggerfn=AfterCount(1),
         accumulation_mode=AccumulationMode.DISCARDING,
         timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)

Reply via email to