This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f22231c [BEAM-7972] Always use Global window in reshuffle and then apply window again. new d8c1146 Merge pull request #9334 from angoenka/fix_reshuffle f22231c is described below commit f22231c8a650db16524548e063f2c3e26fc48f4c Author: Ankur Goenka <ankurgoe...@gmail.com> AuthorDate: Tue Aug 13 15:59:48 2019 -0700 [BEAM-7972] Always use Global window in reshuffle and then apply window again. --- sdks/python/apache_beam/transforms/util.py | 32 ++++++++++++------------------ 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index c2866d6..801f522 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -594,7 +594,6 @@ class ReshufflePerKey(PTransform): # In this (common) case we can use a trivial trigger driver # and avoid the (expensive) window param. globally_windowed = window.GlobalWindows.windowed_value(None) - window_fn = window.GlobalWindows() MIN_TIMESTAMP = window.MIN_TIMESTAMP def reify_timestamps(element, timestamp=DoFn.TimestampParam): @@ -612,29 +611,24 @@ class ReshufflePerKey(PTransform): for (value, timestamp) in values] else: - # The linter is confused. - # hash(1) is used to force "runtime" selection of _IdentityWindowFn - # pylint: disable=abstract-class-instantiated - cls = hash(1) and _IdentityWindowFn - window_fn = cls( - windowing_saved.windowfn.get_window_coder()) - - def reify_timestamps(element, timestamp=DoFn.TimestampParam): + def reify_timestamps(element, + timestamp=DoFn.TimestampParam, + window=DoFn.WindowParam): key, value = element - return key, TimestampedValue(value, timestamp) + # Transport the window as part of the value and restore it later. + return key, windowed_value.WindowedValue(value, timestamp, [window]) - def restore_timestamps(element, window=DoFn.WindowParam): - # Pass the current window since _IdentityWindowFn wouldn't know how - # to generate it. - key, values = element - return [ - windowed_value.WindowedValue( - (key, value.value), value.timestamp, [window]) - for value in values] + def restore_timestamps(element): + key, windowed_values = element + return [wv.with_value((key, wv.value)) for wv in windowed_values] ungrouped = pcoll | Map(reify_timestamps) + + # TODO(BEAM-8104) Using global window as one of the standard window. + # This is to mitigate the Dataflow Java Runner Harness limitation to + # accept only standard coders. ungrouped._windowing = Windowing( - window_fn, + window.GlobalWindows(), triggerfn=AfterCount(1), accumulation_mode=AccumulationMode.DISCARDING, timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)