[BEAM-1502] GroupByKey should not return bare lists in DirectRunner. This leads to invalidated expectations on other runners.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e7059e5c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e7059e5c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e7059e5c Branch: refs/heads/master Commit: e7059e5cb3cd07855582641798c58fc3cf5cd682 Parents: 532256e Author: Robert Bradshaw <rober...@google.com> Authored: Mon Jul 17 13:44:40 2017 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Mon Jul 17 15:08:02 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/snippets/snippets.py | 2 +- sdks/python/apache_beam/transforms/core.py | 2 +- sdks/python/apache_beam/transforms/trigger.py | 21 +++++++++++++++----- 3 files changed, 18 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e7059e5c/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 3a5f9b1..27b8120 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1136,7 +1136,7 @@ def model_group_by_key(contents, output_path): grouped_words = words_and_counts | beam.GroupByKey() # [END model_group_by_key_transform] (grouped_words - | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts))) + | 'count words' >> beam.Map(lambda (word, counts): (word, sum(counts))) | beam.io.WriteToText(output_path)) http://git-wip-us.apache.org/repos/asf/beam/blob/e7059e5c/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 8018219..92b8737 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1017,7 +1017,7 @@ class CombineValuesDoFn(DoFn): self.combinefn.apply(element[1], *args, **kwargs))] # Add the elements into three accumulators (for testing of merge). - elements = element[1] + elements = list(element[1]) accumulators = [] for k in range(3): if len(elements) <= k: http://git-wip-us.apache.org/repos/asf/beam/blob/e7059e5c/sdks/python/apache_beam/transforms/trigger.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index f77fa1a..c1fbfc5 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -24,6 +24,7 @@ from abc import ABCMeta from abc import abstractmethod import collections import copy +import itertools from apache_beam.coders import observable from apache_beam.transforms import combiners @@ -878,6 +879,17 @@ class _UnwindowedValues(observable.ObservableMixin): def __reduce__(self): return list, (list(self),) + def __eq__(self, other): + if isinstance(other, collections.Iterable): + return all( + a == b + for a, b in itertools.izip_longest(self, other, fillvalue=object())) + else: + return NotImplemented + + def __ne__(self, other): + return not self == other + class DefaultGlobalBatchTriggerDriver(TriggerDriver): """Breaks a bundles into window (pane)s according to the default triggering. @@ -888,11 +900,10 @@ class DefaultGlobalBatchTriggerDriver(TriggerDriver): pass def process_elements(self, state, windowed_values, unused_output_watermark): - if isinstance(windowed_values, list): - unwindowed = [wv.value for wv in windowed_values] - else: - unwindowed = _UnwindowedValues(windowed_values) - yield WindowedValue(unwindowed, MIN_TIMESTAMP, self.GLOBAL_WINDOW_TUPLE) + yield WindowedValue( + _UnwindowedValues(windowed_values), + MIN_TIMESTAMP, + self.GLOBAL_WINDOW_TUPLE) def process_timer(self, window_id, name, time_domain, timestamp, state): raise TypeError('Triggers never set or called for batch default windowing.')