Fix remaining style issues from auto conversion, switch ref in pylint to stage 1 (stages in futurize are apparently 1 indexed)
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd702b3e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd702b3e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd702b3e Branch: refs/heads/master Commit: cd702b3e7ea81fe41952066cfe9a1892d5433bd9 Parents: f364248 Author: Holden Karau <hol...@us.ibm.com> Authored: Fri Sep 1 21:38:47 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Oct 12 15:50:08 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/complete/autocomplete.py | 5 +++-- .../examples/complete/game/leader_board.py | 6 ++++-- .../apache_beam/examples/complete/game/user_score.py | 6 ++++-- .../examples/complete/juliaset/juliaset/juliaset.py | 5 ++++- sdks/python/apache_beam/examples/complete/tfidf.py | 6 ++++-- .../apache_beam/examples/complete/tfidf_test.py | 5 ++++- .../examples/cookbook/datastore_wordcount.py | 10 ++++++++-- .../apache_beam/examples/cookbook/mergecontacts.py | 15 ++++++++++++--- .../examples/cookbook/multiple_output_pardo.py | 10 ++++++++-- .../python/apache_beam/examples/snippets/snippets.py | 15 ++++++++++++--- .../apache_beam/examples/streaming_wordcount.py | 5 ++++- .../apache_beam/examples/windowed_wordcount.py | 5 ++++- sdks/python/apache_beam/examples/wordcount.py | 10 ++++++++-- .../apache_beam/examples/wordcount_debugging.py | 10 ++++++++-- .../python/apache_beam/examples/wordcount_minimal.py | 5 ++++- .../runners/portability/maptask_executor_runner.py | 6 +++++- sdks/python/apache_beam/transforms/trigger_test.py | 10 ++++++++-- 17 files changed, 104 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/autocomplete.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py index 81c5351..c09b78e 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete.py @@ -45,13 +45,14 @@ def run(argv=None): pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True with beam.Pipeline(options=pipeline_options) as p: + def format_result(prefix_candidates): + return '%s: %s' % (prefix_candidates[0], prefix_candidates[1]) (p # pylint: disable=expression-not-assigned | 'read' >> ReadFromText(known_args.input) | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) | 'TopPerPrefix' >> TopPerPrefix(5) - | 'format' >> beam.Map( - lambda prefix_candidates: '%s: %s' % (prefix_candidates[0], prefix_candidates[1])) + | 'format' >> beam.Map(format_result) | 'write' >> WriteToText(known_args.output)) http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/game/leader_board.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py index 6fc7b5d..c1b15e9 100644 --- a/sdks/python/apache_beam/examples/complete/game/leader_board.py +++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py @@ -330,10 +330,12 @@ def run(argv=None): })) # Get user scores and write the results to BigQuery + def format_user_score_sums(user_score): + return {'user': user_score[0], 'total_score': user_score[1]} + (events # pylint: disable=expression-not-assigned | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness) - | 'FormatUserScoreSums' >> beam.Map( - lambda user_score: {'user': user_score[0], 'total_score': user_score[1]}) + | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums) | 'WriteUserScoreSums' >> WriteToBigQuery( args.table_name + '_users', args.dataset, { 'user': 'STRING', http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/game/user_score.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py index 5e093bb..0405bab 100644 --- a/sdks/python/apache_beam/examples/complete/game/user_score.py +++ b/sdks/python/apache_beam/examples/complete/game/user_score.py @@ -138,11 +138,13 @@ def run(argv=None): args, pipeline_args = parser.parse_known_args(argv) with beam.Pipeline(argv=pipeline_args) as p: + def format_user_score_sums(user_score): + return 'user: %s, total_score: %s' % (user_score[0], user_score[1]) + (p # pylint: disable=expression-not-assigned | 'ReadInputText' >> beam.io.ReadFromText(args.input) | 'UserScore' >> UserScore() - | 'FormatUserScoreSums' >> beam.Map( - lambda user_score: 'user: %s, total_score: %s' % (user_score[0], user_score[1])) + | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums) | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)) # [END main] http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py index 1013168..bb5b185 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py @@ -107,8 +107,11 @@ def run(argv=None): # pylint: disable=missing-docstring # Group each coordinate triplet by its x value, then write the coordinates # to the output file with an x-coordinate grouping per line. # pylint: disable=expression-not-assigned + def x_coord_key(x_y_i): + return (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2])) + (coordinates - | 'x coord key' >> beam.Map(lambda x_y_i: (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2]))) + | 'x coord key' >> beam.Map(x_coord_key) | 'x coord' >> beam.GroupByKey() | 'format' >> beam.Map( lambda k_coords: ' '.join('(%s, %s, %s)' % c for c in k_coords[1])) http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/tfidf.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index 0300505..55404df 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -100,10 +100,12 @@ class TfIdf(beam.PTransform): # Adjust the above collection to a mapping from (URI, word) pairs to counts # into an isomorphic mapping from URI to (word, count) pairs, to prepare # for a join by the URI key. + def shift_keys(uri_word_count): + return (uri_word_count[0][0], (uri_word_count[0][1], uri_word_count[1])) + uri_to_word_and_count = ( uri_and_word_to_count - | 'ShiftKeys' >> beam.Map( - lambda uri_word_count: (uri_word_count[0][0], (uri_word_count[0][1], uri_word_count[1])))) + | 'ShiftKeys' >> beam.Map(shift_keys)) # Perform a CoGroupByKey (a sort of pre-join) on the prepared # uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/tfidf_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index 957f4c7..71e71e3 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -51,6 +51,9 @@ class TfIdfTest(unittest.TestCase): def test_tfidf_transform(self): with TestPipeline() as p: + def re_key(word_uri_tfidf): + return (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1]) + uri_to_line = p | 'create sample' >> beam.Create( [('1.txt', 'abc def ghi'), ('2.txt', 'abc def'), @@ -58,7 +61,7 @@ class TfIdfTest(unittest.TestCase): result = ( uri_to_line | tfidf.TfIdf() - | beam.Map(lambda word_uri_tfidf: (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1]))) + | beam.Map(re_key)) assert_that(result, equal_to(EXPECTED_RESULTS)) # Run the pipeline. Note that the assert_that above adds to the pipeline # a check that the result PCollection contains expected values. http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index 13c5998..03fcd8a 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -179,15 +179,21 @@ def read_from_datastore(project, user_options, pipeline_options): project, query, user_options.namespace) # Count the occurrences of each word. + def count_ones(word_ones): + return (word_ones[0], sum(word_ones[1])) + counts = (lines | 'split' >> (beam.ParDo(WordExtractingDoFn()) .with_output_types(unicode)) | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() - | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))) + | 'count' >> beam.Map(count_ones)) # Format the counts into a PCollection of strings. - output = counts | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1])) + def format_result(w_c): + return '%s: %s' % (w_c[0], w_c[1]) + + output = counts | 'format' >> beam.Map(format_result) # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/cookbook/mergecontacts.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index 5a35e51..fdfa286 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -103,12 +103,21 @@ def run(argv=None, assert_results=None): '"%s"' % next(iter(name_email_phone_snailmail[1][2]), '')])) # Compute some stats about our database of people. + def without_email(name_email_phone_snailmail1): + return not next(iter(name_email_phone_snailmail1[1][0]), None) + + def without_phones(name_email_phone_snailmail2): + return not next(iter(name_email_phone_snailmail2[1][1]), None) + + def without_address(name_e_p_snailmail): + return not next(iter(name_e_p_snailmail[1][2]), None) + luddites = grouped | beam.Filter( # People without email. - lambda name_email_phone_snailmail1: not next(iter(name_email_phone_snailmail1[1][0]), None)) + without_email) writers = grouped | beam.Filter( # People without phones. - lambda name_email_phone_snailmail2: not next(iter(name_email_phone_snailmail2[1][1]), None)) + without_phones) nomads = grouped | beam.Filter( # People without addresses. - lambda name_e_p_snailmail: not next(iter(name_e_p_snailmail[1][2]), None)) + without_address) num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally() num_writers = writers | 'Writers' >> beam.combiners.Count.Globally() http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py index 259f95d..fe7929e 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py @@ -119,11 +119,17 @@ class CountWords(beam.PTransform): """ def expand(self, pcoll): + def count_ones(word_ones): + return (word_ones[0], sum(word_ones[1])) + + def format_result(w_c): + return '%s: %s' % (w_c[0], w_c[1]) + return (pcoll | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() - | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))) - | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))) + | 'count' >> beam.Map(count_ones) + | 'format' >> beam.Map(format_result)) def run(argv=None): http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 873f3c3..10080c9 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -535,6 +535,9 @@ def examples_wordcount_templated(renames): lines = p | 'Read' >> ReadFromText(wordcount_options.input) # [END example_wordcount_templated] + def format_result(w_c): + return '%s: %s' % (w_c[0], w_c[1]) + ( lines | 'ExtractWords' >> beam.FlatMap( @@ -542,7 +545,7 @@ def examples_wordcount_templated(renames): | 'PairWithOnes' >> beam.Map(lambda x: (x, 1)) | 'Group' >> beam.GroupByKey() | 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))) - | 'Format' >> beam.Map(lambda word_c2: '%s: %s' % (word_c2[0], word_c2[1])) + | 'Format' >> beam.Map(format_result) | 'Write' >> WriteToText(wordcount_options.output) ) @@ -611,8 +614,11 @@ def examples_wordcount_debugging(renames): [('Flourish', 3), ('stomach', 1)])) # [END example_wordcount_debugging_assert] + def format_result(w_c): + return '%s: %s' % (w_c[0], w_c[1]) + output = (filtered_words - | 'format' >> beam.Map(lambda word_c1: '%s: %s' % (word_c1[0], word_c1[1])) + | 'format' >> beam.Map(format_result) | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt')) p.visit(SnippetUtils.RenameFiles(renames)) @@ -1119,6 +1125,9 @@ def model_group_by_key(contents, output_path): import apache_beam as beam with TestPipeline() as p: # Use TestPipeline for testing. + def count_ones(word_ones): + return (word_ones[0], sum(word_ones[1])) + words_and_counts = ( p | beam.Create(contents) @@ -1133,7 +1142,7 @@ def model_group_by_key(contents, output_path): grouped_words = words_and_counts | beam.GroupByKey() # [END model_group_by_key_transform] (grouped_words - | 'count words' >> beam.Map(lambda word_counts: (word_counts[0], sum(word_counts[1]))) + | 'count words' >> beam.Map(count_ones) | beam.io.WriteToText(output_path)) http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/streaming_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index 8a05991..d1af4ce 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -58,6 +58,9 @@ def run(argv=None): lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic) # Capitalize the characters in each line. + def count_ones(word_ones): + return (word_ones[0], sum(word_ones[1])) + transformed = (lines # Use a pre-defined function that imports the re package. | 'Split' >> ( @@ -65,7 +68,7 @@ def run(argv=None): | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | beam.WindowInto(window.FixedWindows(15, 0)) | 'Group' >> beam.GroupByKey() - | 'Count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))) + | 'Count' >> beam.Map(count_ones) | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup)) # Write to PubSub. http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/windowed_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py index 680314b..f88b942 100644 --- a/sdks/python/apache_beam/examples/windowed_wordcount.py +++ b/sdks/python/apache_beam/examples/windowed_wordcount.py @@ -69,13 +69,16 @@ def run(argv=None): lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic) # Capitalize the characters in each line. + def count_ones(word_ones): + return (word_ones[0], sum(word_ones[1])) + transformed = (lines | 'Split' >> (beam.FlatMap(find_words) .with_output_types(unicode)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | beam.WindowInto(window.FixedWindows(2*60, 0)) | 'Group' >> beam.GroupByKey() - | 'Count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))) + | 'Count' >> beam.Map(count_ones) | 'Format' >> beam.ParDo(FormatDoFn())) # Write to BigQuery. http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index e21e91d..60f1ffe 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -88,15 +88,21 @@ def run(argv=None): lines = p | 'read' >> ReadFromText(known_args.input) # Count the occurrences of each word. + def count_ones(word_ones): + return (word_ones[0], sum(word_ones[1])) + counts = (lines | 'split' >> (beam.ParDo(WordExtractingDoFn()) .with_output_types(unicode)) | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() - | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))) + | 'count' >> beam.Map(count_ones)) # Format the counts into a PCollection of strings. - output = counts | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1])) + def format_result(w_c): + return '%s: %s' % (w_c[0], w_c[1]) + + output = counts | 'format' >> beam.Map(format_result) # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/wordcount_debugging.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index bdc4c16..7d18c0d 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -93,12 +93,15 @@ class CountWords(beam.PTransform): PCollection of (word, count) tuples. """ def expand(self, pcoll): + def count_ones(word_ones): + return (word_ones[0], sum(word_ones[1])) + return (pcoll | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) .with_output_types(unicode)) | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() - | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))) + | 'count' >> beam.Map(count_ones)) def run(argv=None): @@ -141,8 +144,11 @@ def run(argv=None): # Format the counts into a PCollection of strings and write the output using # a "Write" transform that has side effects. # pylint: disable=unused-variable + def format_result(w_c): + return '%s: %s' % (w_c[0], w_c[1]) + output = (filtered_words - | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1])) + | 'format' >> beam.Map(format_result) | 'write' >> WriteToText(known_args.output)) http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/wordcount_minimal.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py index 01c3955..54fd1f1 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal.py @@ -106,7 +106,10 @@ def run(argv=None): | 'GroupAndSum' >> beam.CombinePerKey(sum)) # Format the counts into a PCollection of strings. - output = counts | 'Format' >> beam.Map(lambda w_c: '%s: %s' % (w_c[0], w_c[1])) + def format_result(w_c): + return '%s: %s' % (w_c[0], w_c[1]) + + output = counts | 'Format' >> beam.Map(format_result) # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py index d4063df..5b580a6 100644 --- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py +++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py @@ -449,7 +449,11 @@ class MergeAccumulators(beam.PTransform): return beam.pvalue.PCollection(input.pipeline) else: merge_accumulators = self.combine_fn.merge_accumulators - return input | beam.Map(lambda k_vs: (k_vs[0], merge_accumulators(k_vs[1]))) + + def merge_with_existing_key(k_vs): + return (k_vs[0], merge_accumulators(k_vs[1])) + + return input | beam.Map(merge_with_existing_key) class ExtractOutputs(beam.PTransform): http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/transforms/trigger_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index 5a56c7a..3afabaf 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -404,14 +404,20 @@ class TriggerPipelineTest(unittest.TestCase): def test_after_count(self): with TestPipeline() as p: + def construct_timestamped(k_t): + return TimestampedValue((k_t[0], k_t[1]), k_t[1]) + + def format_result(k_v): + return ('%s-%s' % (k_v[0], len(k_v[1])), set(k_v[1])) + result = (p | beam.Create([1, 2, 3, 4, 5, 10, 11]) | beam.FlatMap(lambda t: [('A', t), ('B', t + 5)]) - | beam.Map(lambda k_t: TimestampedValue((k_t[0], k_t[1]), k_t[1])) + | beam.Map(construct_timestamped) | beam.WindowInto(FixedWindows(10), trigger=AfterCount(3), accumulation_mode=AccumulationMode.DISCARDING) | beam.GroupByKey() - | beam.Map(lambda k_v: ('%s-%s' % (k_v[0], len(k_v[1])), set(k_v[1])))) + | beam.Map(format_result)) assert_that(result, equal_to( { 'A-5': {1, 2, 3, 4, 5},