Repository: beam Updated Branches: refs/heads/master d0601b30c -> 474345f59
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index f7b51a7..6654fef 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -119,7 +119,6 @@ class ParDoTest(unittest.TestCase): self.assertEqual({'A', 'C'}, set(all_capitals)) def test_pardo_with_label(self): - # pylint: disable=line-too-long words = ['aa', 'bbc', 'defg'] # [START model_pardo_with_label] result = words | 'CountUniqueLetters' >> beam.Map( @@ -129,41 +128,41 @@ class ParDoTest(unittest.TestCase): self.assertEqual({1, 2, 4}, set(result)) def test_pardo_side_input(self): - p = TestPipeline() - words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd']) - - # [START model_pardo_side_input] - # Callable takes additional arguments. - def filter_using_length(word, lower_bound, upper_bound=float('inf')): - if lower_bound <= len(word) <= upper_bound: - yield word - - # Construct a deferred side input. - avg_word_len = (words - | beam.Map(len) - | beam.CombineGlobally(beam.combiners.MeanCombineFn())) - - # Call with explicit side inputs. - small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3) - - # A single deferred side input. - larger_than_average = (words | 'large' >> beam.FlatMap( - filter_using_length, - lower_bound=pvalue.AsSingleton(avg_word_len))) - - # Mix and match. - small_but_nontrivial = words | beam.FlatMap(filter_using_length, - lower_bound=2, - upper_bound=pvalue.AsSingleton( - avg_word_len)) - # [END model_pardo_side_input] - - assert_that(small_words, equal_to(['a', 'bb', 'ccc'])) - assert_that(larger_than_average, equal_to(['ccc', 'dddd']), - label='larger_than_average') - assert_that(small_but_nontrivial, equal_to(['bb']), - label='small_but_not_trivial') - p.run() + # pylint: disable=line-too-long + with TestPipeline() as p: + words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd']) + + # [START model_pardo_side_input] + # Callable takes additional arguments. + def filter_using_length(word, lower_bound, upper_bound=float('inf')): + if lower_bound <= len(word) <= upper_bound: + yield word + + # Construct a deferred side input. + avg_word_len = (words + | beam.Map(len) + | beam.CombineGlobally(beam.combiners.MeanCombineFn())) + + # Call with explicit side inputs. + small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3) + + # A single deferred side input. + larger_than_average = (words | 'large' >> beam.FlatMap( + filter_using_length, + lower_bound=pvalue.AsSingleton(avg_word_len))) + + # Mix and match. + small_but_nontrivial = words | beam.FlatMap( + filter_using_length, + lower_bound=2, + upper_bound=pvalue.AsSingleton(avg_word_len)) + # [END model_pardo_side_input] + + assert_that(small_words, equal_to(['a', 'bb', 'ccc'])) + assert_that(larger_than_average, equal_to(['ccc', 'dddd']), + label='larger_than_average') + assert_that(small_but_nontrivial, equal_to(['bb']), + label='small_but_not_trivial') def test_pardo_side_input_dofn(self): words = ['a', 'bb', 'ccc', 'dddd'] @@ -307,10 +306,9 @@ class TypeHintsTest(unittest.TestCase): def test_runtime_checks_off(self): # pylint: disable=expression-not-assigned - p = TestPipeline() - # [START type_hints_runtime_off] - p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str) - p.run() + with TestPipeline() as p: + # [START type_hints_runtime_off] + p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str) # [END type_hints_runtime_off] def test_runtime_checks_on(self): @@ -323,47 +321,45 @@ class TypeHintsTest(unittest.TestCase): # [END type_hints_runtime_on] def test_deterministic_key(self): - p = TestPipeline() - lines = (p | beam.Create( - ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3'])) + with TestPipeline() as p: + lines = (p | beam.Create( + ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3'])) - # For pickling - global Player # pylint: disable=global-variable-not-assigned + # For pickling + global Player # pylint: disable=global-variable-not-assigned - # [START type_hints_deterministic_key] - class Player(object): - def __init__(self, team, name): - self.team = team - self.name = name + # [START type_hints_deterministic_key] + class Player(object): + def __init__(self, team, name): + self.team = team + self.name = name - class PlayerCoder(beam.coders.Coder): - def encode(self, player): - return '%s:%s' % (player.team, player.name) + class PlayerCoder(beam.coders.Coder): + def encode(self, player): + return '%s:%s' % (player.team, player.name) - def decode(self, s): - return Player(*s.split(':')) + def decode(self, s): + return Player(*s.split(':')) - def is_deterministic(self): - return True + def is_deterministic(self): + return True - beam.coders.registry.register_coder(Player, PlayerCoder) + beam.coders.registry.register_coder(Player, PlayerCoder) - def parse_player_and_score(csv): - name, team, score = csv.split(',') - return Player(team, name), int(score) + def parse_player_and_score(csv): + name, team, score = csv.split(',') + return Player(team, name), int(score) - totals = ( - lines - | beam.Map(parse_player_and_score) - | beam.CombinePerKey(sum).with_input_types( - beam.typehints.Tuple[Player, int])) - # [END type_hints_deterministic_key] + totals = ( + lines + | beam.Map(parse_player_and_score) + | beam.CombinePerKey(sum).with_input_types( + beam.typehints.Tuple[Player, int])) + # [END type_hints_deterministic_key] - assert_that( - totals | beam.Map(lambda (k, v): (k.name, v)), - equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)])) - - p.run() + assert_that( + totals | beam.Map(lambda (k, v): (k.name, v)), + equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)])) class SnippetsTest(unittest.TestCase): @@ -802,109 +798,104 @@ class CombineTest(unittest.TestCase): self.assertEqual({('cat', 3), ('dog', 2)}, set(perkey_counts)) def test_setting_fixed_windows(self): - p = TestPipeline() - unkeyed_items = p | beam.Create([22, 33, 55, 100, 115, 120]) - items = (unkeyed_items - | 'key' >> beam.Map( - lambda x: beam.window.TimestampedValue(('k', x), x))) - # [START setting_fixed_windows] - from apache_beam import window - fixed_windowed_items = ( - items | 'window' >> beam.WindowInto(window.FixedWindows(60))) - # [END setting_fixed_windows] - summed = (fixed_windowed_items - | 'group' >> beam.GroupByKey() - | 'combine' >> beam.CombineValues(sum)) - unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - assert_that(unkeyed, equal_to([110, 215, 120])) - p.run() + with TestPipeline() as p: + unkeyed_items = p | beam.Create([22, 33, 55, 100, 115, 120]) + items = (unkeyed_items + | 'key' >> beam.Map( + lambda x: beam.window.TimestampedValue(('k', x), x))) + # [START setting_fixed_windows] + from apache_beam import window + fixed_windowed_items = ( + items | 'window' >> beam.WindowInto(window.FixedWindows(60))) + # [END setting_fixed_windows] + summed = (fixed_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + assert_that(unkeyed, equal_to([110, 215, 120])) def test_setting_sliding_windows(self): - p = TestPipeline() - unkeyed_items = p | beam.Create([2, 16, 23]) - items = (unkeyed_items - | 'key' >> beam.Map( - lambda x: beam.window.TimestampedValue(('k', x), x))) - # [START setting_sliding_windows] - from apache_beam import window - sliding_windowed_items = ( - items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5))) - # [END setting_sliding_windows] - summed = (sliding_windowed_items - | 'group' >> beam.GroupByKey() - | 'combine' >> beam.CombineValues(sum)) - unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - assert_that(unkeyed, - equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41])) - p.run() + with TestPipeline() as p: + unkeyed_items = p | beam.Create([2, 16, 23]) + items = (unkeyed_items + | 'key' >> beam.Map( + lambda x: beam.window.TimestampedValue(('k', x), x))) + # [START setting_sliding_windows] + from apache_beam import window + sliding_windowed_items = ( + items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5))) + # [END setting_sliding_windows] + summed = (sliding_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + assert_that(unkeyed, + equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41])) def test_setting_session_windows(self): - p = TestPipeline() - unkeyed_items = p | beam.Create([2, 11, 16, 27]) - items = (unkeyed_items - | 'key' >> beam.Map( - lambda x: beam.window.TimestampedValue(('k', x), x))) - # [START setting_session_windows] - from apache_beam import window - session_windowed_items = ( - items | 'window' >> beam.WindowInto(window.Sessions(10))) - # [END setting_session_windows] - summed = (session_windowed_items - | 'group' >> beam.GroupByKey() - | 'combine' >> beam.CombineValues(sum)) - unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - assert_that(unkeyed, - equal_to([29, 27])) - p.run() + with TestPipeline() as p: + unkeyed_items = p | beam.Create([2, 11, 16, 27]) + items = (unkeyed_items + | 'key' >> beam.Map( + lambda x: beam.window.TimestampedValue(('k', x), x))) + # [START setting_session_windows] + from apache_beam import window + session_windowed_items = ( + items | 'window' >> beam.WindowInto(window.Sessions(10))) + # [END setting_session_windows] + summed = (session_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + assert_that(unkeyed, + equal_to([29, 27])) def test_setting_global_window(self): - p = TestPipeline() - unkeyed_items = p | beam.Create([2, 11, 16, 27]) - items = (unkeyed_items - | 'key' >> beam.Map( - lambda x: beam.window.TimestampedValue(('k', x), x))) - # [START setting_global_window] - from apache_beam import window - session_windowed_items = ( - items | 'window' >> beam.WindowInto(window.GlobalWindows())) - # [END setting_global_window] - summed = (session_windowed_items - | 'group' >> beam.GroupByKey() - | 'combine' >> beam.CombineValues(sum)) - unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - assert_that(unkeyed, equal_to([56])) - p.run() + with TestPipeline() as p: + unkeyed_items = p | beam.Create([2, 11, 16, 27]) + items = (unkeyed_items + | 'key' >> beam.Map( + lambda x: beam.window.TimestampedValue(('k', x), x))) + # [START setting_global_window] + from apache_beam import window + session_windowed_items = ( + items | 'window' >> beam.WindowInto(window.GlobalWindows())) + # [END setting_global_window] + summed = (session_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + assert_that(unkeyed, equal_to([56])) def test_setting_timestamp(self): - p = TestPipeline() - unkeyed_items = p | beam.Create([12, 30, 60, 61, 66]) - items = (unkeyed_items | 'key' >> beam.Map(lambda x: ('k', x))) + with TestPipeline() as p: + unkeyed_items = p | beam.Create([12, 30, 60, 61, 66]) + items = (unkeyed_items | 'key' >> beam.Map(lambda x: ('k', x))) - def extract_timestamp_from_log_entry(entry): - return entry[1] + def extract_timestamp_from_log_entry(entry): + return entry[1] - # [START setting_timestamp] - class AddTimestampDoFn(beam.DoFn): + # [START setting_timestamp] + class AddTimestampDoFn(beam.DoFn): - def process(self, element): - # Extract the numeric Unix seconds-since-epoch timestamp to be - # associated with the current log entry. - unix_timestamp = extract_timestamp_from_log_entry(element) - # Wrap and emit the current entry and new timestamp in a - # TimestampedValue. - yield beam.window.TimestampedValue(element, unix_timestamp) - - timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn()) - # [END setting_timestamp] - fixed_windowed_items = ( - timestamped_items | 'window' >> beam.WindowInto( - beam.window.FixedWindows(60))) - summed = (fixed_windowed_items - | 'group' >> beam.GroupByKey() - | 'combine' >> beam.CombineValues(sum)) - unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - assert_that(unkeyed, equal_to([42, 187])) - p.run() + def process(self, element): + # Extract the numeric Unix seconds-since-epoch timestamp to be + # associated with the current log entry. + unix_timestamp = extract_timestamp_from_log_entry(element) + # Wrap and emit the current entry and new timestamp in a + # TimestampedValue. + yield beam.window.TimestampedValue(element, unix_timestamp) + + timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn()) + # [END setting_timestamp] + fixed_windowed_items = ( + timestamped_items | 'window' >> beam.WindowInto( + beam.window.FixedWindows(60))) + summed = (fixed_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + assert_that(unkeyed, equal_to([42, 187])) class PTransformTest(unittest.TestCase): @@ -919,10 +910,9 @@ class PTransformTest(unittest.TestCase): return pcoll | beam.Map(lambda x: len(x)) # [END model_composite_transform] - p = TestPipeline() - lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths() - assert_that(lengths, equal_to([1, 2, 3])) - p.run() + with TestPipeline() as p: + lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths() + assert_that(lengths, equal_to([1, 2, 3])) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/streaming_wordcap.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py index d0cc8a2..ce43e1f 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcap.py +++ b/sdks/python/apache_beam/examples/streaming_wordcap.py @@ -41,22 +41,20 @@ def run(argv=None): help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".') known_args, pipeline_args = parser.parse_known_args(argv) - p = beam.Pipeline(argv=pipeline_args) + with beam.Pipeline(argv=pipeline_args) as p: - # Read the text file[pattern] into a PCollection. - lines = p | beam.io.Read( - beam.io.PubSubSource(known_args.input_topic)) + # Read the text file[pattern] into a PCollection. + lines = p | beam.io.Read( + beam.io.PubSubSource(known_args.input_topic)) - # Capitalize the characters in each line. - transformed = (lines - | 'capitalize' >> (beam.Map(lambda x: x.upper()))) + # Capitalize the characters in each line. + transformed = (lines + | 'capitalize' >> (beam.Map(lambda x: x.upper()))) - # Write to PubSub. - # pylint: disable=expression-not-assigned - transformed | beam.io.Write( - beam.io.PubSubSink(known_args.output_topic)) - - p.run().wait_until_finish() + # Write to PubSub. + # pylint: disable=expression-not-assigned + transformed | beam.io.Write( + beam.io.PubSubSink(known_args.output_topic)) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/streaming_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index 4b6aecc..e9d5dbe 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -44,29 +44,27 @@ def run(argv=None): help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".') known_args, pipeline_args = parser.parse_known_args(argv) - p = beam.Pipeline(argv=pipeline_args) - - # Read the text file[pattern] into a PCollection. - lines = p | 'read' >> beam.io.Read( - beam.io.PubSubSource(known_args.input_topic)) - - # Capitalize the characters in each line. - transformed = (lines - | 'Split' >> ( - beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - .with_output_types(unicode)) - | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) - | beam.WindowInto(window.FixedWindows(15, 0)) - | 'Group' >> beam.GroupByKey() - | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones))) - | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup)) - - # Write to PubSub. - # pylint: disable=expression-not-assigned - transformed | 'pubsub_write' >> beam.io.Write( - beam.io.PubSubSink(known_args.output_topic)) - - p.run().wait_until_finish() + with beam.Pipeline(argv=pipeline_args) as p: + + # Read the text file[pattern] into a PCollection. + lines = p | 'read' >> beam.io.Read( + beam.io.PubSubSource(known_args.input_topic)) + + # Capitalize the characters in each line. + transformed = (lines + | 'Split' >> ( + beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + .with_output_types(unicode)) + | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) + | beam.WindowInto(window.FixedWindows(15, 0)) + | 'Group' >> beam.GroupByKey() + | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones))) + | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup)) + + # Write to PubSub. + # pylint: disable=expression-not-assigned + transformed | 'pubsub_write' >> beam.io.Write( + beam.io.PubSubSink(known_args.output_topic)) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index e7e542a..34dedb2 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -102,7 +102,6 @@ def run(argv=None): # pylint: disable=expression-not-assigned output | 'write' >> WriteToText(known_args.output) - # Actually run the pipeline (all operations above are deferred). result = p.run() result.wait_until_finish() http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/wordcount_debugging.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index ca9f7b6..c0ffd35 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -118,35 +118,32 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - # Read the text file[pattern] into a PCollection, count the occurrences of - # each word and filter by a list of words. - filtered_words = ( - p | 'read' >> ReadFromText(known_args.input) - | CountWords() - | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) - - # assert_that is a convenient PTransform that checks a PCollection has an - # expected value. Asserts are best used in unit tests with small data sets but - # is demonstrated here as a teaching tool. - # - # Note assert_that does not provide any output and that successful completion - # of the Pipeline implies that the expectations were met. Learn more at - # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to - # test your pipeline. - assert_that( - filtered_words, equal_to([('Flourish', 3), ('stomach', 1)])) - - # Format the counts into a PCollection of strings and write the output using a - # "Write" transform that has side effects. - # pylint: disable=unused-variable - output = (filtered_words - | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) - | 'write' >> WriteToText(known_args.output)) - - # Actually run the pipeline (all operations above are deferred). - p.run().wait_until_finish() + with beam.Pipeline(options=pipeline_options) as p: + + # Read the text file[pattern] into a PCollection, count the occurrences of + # each word and filter by a list of words. + filtered_words = ( + p | 'read' >> ReadFromText(known_args.input) + | CountWords() + | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) + + # assert_that is a convenient PTransform that checks a PCollection has an + # expected value. Asserts are best used in unit tests with small data sets + # but is demonstrated here as a teaching tool. + # + # Note assert_that does not provide any output and that successful + # completion of the Pipeline implies that the expectations were met. Learn + # more at https://cloud.google.com/dataflow/pipelines/testing-your-pipeline + # on how to best test your pipeline. + assert_that( + filtered_words, equal_to([('Flourish', 3), ('stomach', 1)])) + + # Format the counts into a PCollection of strings and write the output using + # a "Write" transform that has side effects. + # pylint: disable=unused-variable + output = (filtered_words + | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) + | 'write' >> WriteToText(known_args.output)) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/wordcount_minimal.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py index 5109c08..76b0a22 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal.py @@ -92,28 +92,25 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) + with beam.Pipeline(options=pipeline_options) as p: - # Read the text file[pattern] into a PCollection. - lines = p | 'read' >> ReadFromText(known_args.input) + # Read the text file[pattern] into a PCollection. + lines = p | ReadFromText(known_args.input) - # Count the occurrences of each word. - counts = (lines - | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - .with_output_types(unicode)) - | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) - | 'group' >> beam.GroupByKey() - | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))) + # Count the occurrences of each word. + counts = ( + lines + | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + .with_output_types(unicode)) + | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) + | 'GroupAndSum' >> beam.CombinePerKey(sum)) - # Format the counts into a PCollection of strings. - output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) + # Format the counts into a PCollection of strings. + output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c)) - # Write the output using a "Write" transform that has side effects. - # pylint: disable=expression-not-assigned - output | 'write' >> WriteToText(known_args.output) - - # Actually run the pipeline (all operations above are deferred). - p.run().wait_until_finish() + # Write the output using a "Write" transform that has side effects. + # pylint: disable=expression-not-assigned + output | WriteToText(known_args.output) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/io/filebasedsink_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py index 1f6aeee..7c8ddb4 100644 --- a/sdks/python/apache_beam/io/filebasedsink_test.py +++ b/sdks/python/apache_beam/io/filebasedsink_test.py @@ -146,9 +146,8 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp): sink = MyFileBasedSink( temp_path, file_name_suffix='.output', coder=coders.ToStringCoder() ) - p = TestPipeline() - p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned - p.run() + with TestPipeline() as p: + p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned self.assertEqual( open(temp_path + '-00000-of-00001.output').read(), '[start][end]') @@ -160,9 +159,8 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp): file_name_suffix=StaticValueProvider(value_type=str, value='.output'), coder=coders.ToStringCoder() ) - p = TestPipeline() - p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned - p.run() + with TestPipeline() as p: + p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned self.assertEqual( open(temp_path.get() + '-00000-of-00001.output').read(), '[start][end]') @@ -174,10 +172,8 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp): num_shards=3, shard_name_template='_NN_SSS_', coder=coders.ToStringCoder()) - p = TestPipeline() - p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned - - p.run() + with TestPipeline() as p: + p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned concat = ''.join( open(temp_path + '_03_%03d_.output' % shard_num).read() http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 5048534..9093abf 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -28,19 +28,18 @@ to be executed for each node visited is specified through a runner object. Typical usage: # Create a pipeline object using a local runner for execution. - p = beam.Pipeline('DirectRunner') + with beam.Pipeline('DirectRunner') as p: - # Add to the pipeline a "Create" transform. When executed this - # transform will produce a PCollection object with the specified values. - pcoll = p | 'Create' >> beam.Create([1, 2, 3]) + # Add to the pipeline a "Create" transform. When executed this + # transform will produce a PCollection object with the specified values. + pcoll = p | 'Create' >> beam.Create([1, 2, 3]) - # Another transform could be applied to pcoll, e.g., writing to a text file. - # For other transforms, refer to transforms/ directory. - pcoll | 'Write' >> beam.io.WriteToText('./output') + # Another transform could be applied to pcoll, e.g., writing to a text file. + # For other transforms, refer to transforms/ directory. + pcoll | 'Write' >> beam.io.WriteToText('./output') - # run() will execute the DAG stored in the pipeline. The execution of the - # nodes visited is done using the specified local runner. - p.run() + # run() will execute the DAG stored in the pipeline. The execution of the + # nodes visited is done using the specified local runner. """ http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/transforms/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 946a60a..c79fec8 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -247,26 +247,23 @@ class CombineTest(unittest.TestCase): pipeline.run() def test_tuple_combine_fn(self): - p = TestPipeline() - result = ( - p - | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)]) - | beam.CombineGlobally(combine.TupleCombineFn(max, - combine.MeanCombineFn(), - sum)).without_defaults()) - assert_that(result, equal_to([('c', 111.0 / 3, 99.0)])) - p.run() + with TestPipeline() as p: + result = ( + p + | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)]) + | beam.CombineGlobally(combine.TupleCombineFn( + max, combine.MeanCombineFn(), sum)).without_defaults()) + assert_that(result, equal_to([('c', 111.0 / 3, 99.0)])) def test_tuple_combine_fn_without_defaults(self): - p = TestPipeline() - result = ( - p - | Create([1, 1, 2, 3]) - | beam.CombineGlobally( - combine.TupleCombineFn(min, combine.MeanCombineFn(), max) - .with_common_input()).without_defaults()) - assert_that(result, equal_to([(1, 7.0 / 4, 3)])) - p.run() + with TestPipeline() as p: + result = ( + p + | Create([1, 1, 2, 3]) + | beam.CombineGlobally( + combine.TupleCombineFn(min, combine.MeanCombineFn(), max) + .with_common_input()).without_defaults()) + assert_that(result, equal_to([(1, 7.0 / 4, 3)])) def test_to_list_and_to_dict(self): pipeline = TestPipeline() @@ -295,29 +292,26 @@ class CombineTest(unittest.TestCase): pipeline.run() def test_combine_globally_with_default(self): - p = TestPipeline() - assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0])) - p.run() + with TestPipeline() as p: + assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0])) def test_combine_globally_without_default(self): - p = TestPipeline() - result = p | Create([]) | CombineGlobally(sum).without_defaults() - assert_that(result, equal_to([])) - p.run() + with TestPipeline() as p: + result = p | Create([]) | CombineGlobally(sum).without_defaults() + assert_that(result, equal_to([])) def test_combine_globally_with_default_side_input(self): - class CombineWithSideInput(PTransform): + class SideInputCombine(PTransform): def expand(self, pcoll): side = pcoll | CombineGlobally(sum).as_singleton_view() main = pcoll.pipeline | Create([None]) return main | Map(lambda _, s: s, side) - p = TestPipeline() - result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput() - result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput() - assert_that(result1, equal_to([0]), label='r1') - assert_that(result2, equal_to([10]), label='r2') - p.run() + with TestPipeline() as p: + result1 = p | 'i1' >> Create([]) | 'c1' >> SideInputCombine() + result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> SideInputCombine() + assert_that(result1, equal_to([0]), label='r1') + assert_that(result2, equal_to([10]), label='r2') if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index fd1bb9d..977a364 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -167,90 +167,85 @@ class WindowTest(unittest.TestCase): | Map(lambda x: WindowedValue((key, x), x, [GlobalWindow()]))) def test_sliding_windows(self): - p = TestPipeline() - pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3) - result = (pcoll - | 'w' >> WindowInto(SlidingWindows(period=2, size=4)) - | GroupByKey() - | reify_windows) - expected = [('key @ [-2.0, 2.0)', [1]), - ('key @ [0.0, 4.0)', [1, 2, 3]), - ('key @ [2.0, 6.0)', [2, 3])] - assert_that(result, equal_to(expected)) - p.run() + with TestPipeline() as p: + pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3) + result = (pcoll + | 'w' >> WindowInto(SlidingWindows(period=2, size=4)) + | GroupByKey() + | reify_windows) + expected = [('key @ [-2.0, 2.0)', [1]), + ('key @ [0.0, 4.0)', [1, 2, 3]), + ('key @ [2.0, 6.0)', [2, 3])] + assert_that(result, equal_to(expected)) def test_sessions(self): - p = TestPipeline() - pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27) - result = (pcoll - | 'w' >> WindowInto(Sessions(10)) - | GroupByKey() - | sort_values - | reify_windows) - expected = [('key @ [1.0, 13.0)', [1, 2, 3]), - ('key @ [20.0, 45.0)', [20, 27, 35])] - assert_that(result, equal_to(expected)) - p.run() + with TestPipeline() as p: + pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27) + result = (pcoll + | 'w' >> WindowInto(Sessions(10)) + | GroupByKey() + | sort_values + | reify_windows) + expected = [('key @ [1.0, 13.0)', [1, 2, 3]), + ('key @ [20.0, 45.0)', [20, 27, 35])] + assert_that(result, equal_to(expected)) def test_timestamped_value(self): - p = TestPipeline() - result = (p - | 'start' >> Create([(k, k) for k in range(10)]) - | Map(lambda (x, t): TimestampedValue(x, t)) - | 'w' >> WindowInto(FixedWindows(5)) - | Map(lambda v: ('key', v)) - | GroupByKey()) - assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]), - ('key', [5, 6, 7, 8, 9])])) - p.run() + with TestPipeline() as p: + result = (p + | 'start' >> Create([(k, k) for k in range(10)]) + | Map(lambda (x, t): TimestampedValue(x, t)) + | 'w' >> WindowInto(FixedWindows(5)) + | Map(lambda v: ('key', v)) + | GroupByKey()) + assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]), + ('key', [5, 6, 7, 8, 9])])) def test_rewindow(self): - p = TestPipeline() - result = (p - | Create([(k, k) for k in range(10)]) - | Map(lambda (x, t): TimestampedValue(x, t)) - | 'window' >> WindowInto(SlidingWindows(period=2, size=6)) - # Per the model, each element is now duplicated across - # three windows. Rewindowing must preserve this duplication. - | 'rewindow' >> WindowInto(FixedWindows(5)) - | 'rewindow2' >> WindowInto(FixedWindows(5)) - | Map(lambda v: ('key', v)) - | GroupByKey()) - assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)), - ('key', sorted([5, 6, 7, 8, 9] * 3))])) - p.run() + with TestPipeline() as p: + result = (p + | Create([(k, k) for k in range(10)]) + | Map(lambda (x, t): TimestampedValue(x, t)) + | 'window' >> WindowInto(SlidingWindows(period=2, size=6)) + # Per the model, each element is now duplicated across + # three windows. Rewindowing must preserve this duplication. + | 'rewindow' >> WindowInto(FixedWindows(5)) + | 'rewindow2' >> WindowInto(FixedWindows(5)) + | Map(lambda v: ('key', v)) + | GroupByKey()) + assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)), + ('key', sorted([5, 6, 7, 8, 9] * 3))])) def test_timestamped_with_combiners(self): - p = TestPipeline() - result = (p - # Create some initial test values. - | 'start' >> Create([(k, k) for k in range(10)]) - # The purpose of the WindowInto transform is to establish a - # FixedWindows windowing function for the PCollection. - # It does not bucket elements into windows since the timestamps - # from Create are not spaced 5 ms apart and very likely they all - # fall into the same window. - | 'w' >> WindowInto(FixedWindows(5)) - # Generate timestamped values using the values as timestamps. - # Now there are values 5 ms apart and since Map propagates the - # windowing function from input to output the output PCollection - # will have elements falling into different 5ms windows. - | Map(lambda (x, t): TimestampedValue(x, t)) - # We add a 'key' to each value representing the index of the - # window. This is important since there is no guarantee of - # order for the elements of a PCollection. - | Map(lambda v: (v / 5, v))) - # Sum all elements associated with a key and window. Although it - # is called CombinePerKey it is really CombinePerKeyAndWindow the - # same way GroupByKey is really GroupByKeyAndWindow. - sum_per_window = result | CombinePerKey(sum) - # Compute mean per key and window. - mean_per_window = result | combiners.Mean.PerKey() - assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]), - label='assert:sum') - assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]), - label='assert:mean') - p.run() + with TestPipeline() as p: + result = (p + # Create some initial test values. + | 'start' >> Create([(k, k) for k in range(10)]) + # The purpose of the WindowInto transform is to establish a + # FixedWindows windowing function for the PCollection. + # It does not bucket elements into windows since the timestamps + # from Create are not spaced 5 ms apart and very likely they all + # fall into the same window. + | 'w' >> WindowInto(FixedWindows(5)) + # Generate timestamped values using the values as timestamps. + # Now there are values 5 ms apart and since Map propagates the + # windowing function from input to output the output PCollection + # will have elements falling into different 5ms windows. + | Map(lambda (x, t): TimestampedValue(x, t)) + # We add a 'key' to each value representing the index of the + # window. This is important since there is no guarantee of + # order for the elements of a PCollection. + | Map(lambda v: (v / 5, v))) + # Sum all elements associated with a key and window. Although it + # is called CombinePerKey it is really CombinePerKeyAndWindow the + # same way GroupByKey is really GroupByKeyAndWindow. + sum_per_window = result | CombinePerKey(sum) + # Compute mean per key and window. + mean_per_window = result | combiners.Mean.PerKey() + assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]), + label='assert:sum') + assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]), + label='assert:mean') class RunnerApiTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/transforms/write_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index e31b9cc..50f0deb 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -98,11 +98,10 @@ class WriteTest(unittest.TestCase): return_write_results=True): write_to_test_sink = WriteToTestSink(return_init_result, return_write_results) - p = TestPipeline() - result = p | beam.Create(data) | write_to_test_sink | beam.Map(list) + with TestPipeline() as p: + result = p | beam.Create(data) | write_to_test_sink | beam.Map(list) - assert_that(result, is_empty()) - p.run() + assert_that(result, is_empty()) sink = write_to_test_sink.last_sink self.assertIsNotNone(sink) http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/typehints/typed_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 589dc0e..c81ef32 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -168,12 +168,11 @@ class SideInputTest(unittest.TestCase): @typehints.with_input_types(str, int) def repeat(s, times): return s * times - p = TestPipeline() - main_input = p | beam.Create(['a', 'bb', 'c']) - side_input = p | 'side' >> beam.Create([3]) - result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input)) - assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc'])) - p.run() + with TestPipeline() as p: + main_input = p | beam.Create(['a', 'bb', 'c']) + side_input = p | 'side' >> beam.Create([3]) + result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input)) + assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc'])) bad_side_input = p | 'bad_side' >> beam.Create(['z']) with self.assertRaises(typehints.TypeCheckError): @@ -183,12 +182,11 @@ class SideInputTest(unittest.TestCase): @typehints.with_input_types(str, typehints.Iterable[str]) def concat(glue, items): return glue.join(sorted(items)) - p = TestPipeline() - main_input = p | beam.Create(['a', 'bb', 'c']) - side_input = p | 'side' >> beam.Create(['x', 'y', 'z']) - result = main_input | beam.Map(concat, pvalue.AsIter(side_input)) - assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz'])) - p.run() + with TestPipeline() as p: + main_input = p | beam.Create(['a', 'bb', 'c']) + side_input = p | 'side' >> beam.Create(['x', 'y', 'z']) + result = main_input | beam.Map(concat, pvalue.AsIter(side_input)) + assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz'])) bad_side_input = p | 'bad_side' >> beam.Create([1, 2, 3]) with self.assertRaises(typehints.TypeCheckError):