Repository: beam
Updated Branches:
  refs/heads/master d0601b30c -> 474345f59


http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index f7b51a7..6654fef 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -119,7 +119,6 @@ class ParDoTest(unittest.TestCase):
     self.assertEqual({'A', 'C'}, set(all_capitals))
 
   def test_pardo_with_label(self):
-    # pylint: disable=line-too-long
     words = ['aa', 'bbc', 'defg']
     # [START model_pardo_with_label]
     result = words | 'CountUniqueLetters' >> beam.Map(
@@ -129,41 +128,41 @@ class ParDoTest(unittest.TestCase):
     self.assertEqual({1, 2, 4}, set(result))
 
   def test_pardo_side_input(self):
-    p = TestPipeline()
-    words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
-
-    # [START model_pardo_side_input]
-    # Callable takes additional arguments.
-    def filter_using_length(word, lower_bound, upper_bound=float('inf')):
-      if lower_bound <= len(word) <= upper_bound:
-        yield word
-
-    # Construct a deferred side input.
-    avg_word_len = (words
-                    | beam.Map(len)
-                    | beam.CombineGlobally(beam.combiners.MeanCombineFn()))
-
-    # Call with explicit side inputs.
-    small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
-
-    # A single deferred side input.
-    larger_than_average = (words | 'large' >> beam.FlatMap(
-        filter_using_length,
-        lower_bound=pvalue.AsSingleton(avg_word_len)))
-
-    # Mix and match.
-    small_but_nontrivial = words | beam.FlatMap(filter_using_length,
-                                                lower_bound=2,
-                                                upper_bound=pvalue.AsSingleton(
-                                                    avg_word_len))
-    # [END model_pardo_side_input]
-
-    assert_that(small_words, equal_to(['a', 'bb', 'ccc']))
-    assert_that(larger_than_average, equal_to(['ccc', 'dddd']),
-                label='larger_than_average')
-    assert_that(small_but_nontrivial, equal_to(['bb']),
-                label='small_but_not_trivial')
-    p.run()
+    # pylint: disable=line-too-long
+    with TestPipeline() as p:
+      words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
+
+      # [START model_pardo_side_input]
+      # Callable takes additional arguments.
+      def filter_using_length(word, lower_bound, upper_bound=float('inf')):
+        if lower_bound <= len(word) <= upper_bound:
+          yield word
+
+      # Construct a deferred side input.
+      avg_word_len = (words
+                      | beam.Map(len)
+                      | beam.CombineGlobally(beam.combiners.MeanCombineFn()))
+
+      # Call with explicit side inputs.
+      small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
+
+      # A single deferred side input.
+      larger_than_average = (words | 'large' >> beam.FlatMap(
+          filter_using_length,
+          lower_bound=pvalue.AsSingleton(avg_word_len)))
+
+      # Mix and match.
+      small_but_nontrivial = words | beam.FlatMap(
+          filter_using_length,
+          lower_bound=2,
+          upper_bound=pvalue.AsSingleton(avg_word_len))
+      # [END model_pardo_side_input]
+
+      assert_that(small_words, equal_to(['a', 'bb', 'ccc']))
+      assert_that(larger_than_average, equal_to(['ccc', 'dddd']),
+                  label='larger_than_average')
+      assert_that(small_but_nontrivial, equal_to(['bb']),
+                  label='small_but_not_trivial')
 
   def test_pardo_side_input_dofn(self):
     words = ['a', 'bb', 'ccc', 'dddd']
@@ -307,10 +306,9 @@ class TypeHintsTest(unittest.TestCase):
 
   def test_runtime_checks_off(self):
     # pylint: disable=expression-not-assigned
-    p = TestPipeline()
-    # [START type_hints_runtime_off]
-    p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
-    p.run()
+    with TestPipeline() as p:
+      # [START type_hints_runtime_off]
+      p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
     # [END type_hints_runtime_off]
 
   def test_runtime_checks_on(self):
@@ -323,47 +321,45 @@ class TypeHintsTest(unittest.TestCase):
       # [END type_hints_runtime_on]
 
   def test_deterministic_key(self):
-    p = TestPipeline()
-    lines = (p | beam.Create(
-        ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
+    with TestPipeline() as p:
+      lines = (p | beam.Create(
+          ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 
'zucchini,veg,3']))
 
-    # For pickling
-    global Player  # pylint: disable=global-variable-not-assigned
+      # For pickling
+      global Player  # pylint: disable=global-variable-not-assigned
 
-    # [START type_hints_deterministic_key]
-    class Player(object):
-      def __init__(self, team, name):
-        self.team = team
-        self.name = name
+      # [START type_hints_deterministic_key]
+      class Player(object):
+        def __init__(self, team, name):
+          self.team = team
+          self.name = name
 
-    class PlayerCoder(beam.coders.Coder):
-      def encode(self, player):
-        return '%s:%s' % (player.team, player.name)
+      class PlayerCoder(beam.coders.Coder):
+        def encode(self, player):
+          return '%s:%s' % (player.team, player.name)
 
-      def decode(self, s):
-        return Player(*s.split(':'))
+        def decode(self, s):
+          return Player(*s.split(':'))
 
-      def is_deterministic(self):
-        return True
+        def is_deterministic(self):
+          return True
 
-    beam.coders.registry.register_coder(Player, PlayerCoder)
+      beam.coders.registry.register_coder(Player, PlayerCoder)
 
-    def parse_player_and_score(csv):
-      name, team, score = csv.split(',')
-      return Player(team, name), int(score)
+      def parse_player_and_score(csv):
+        name, team, score = csv.split(',')
+        return Player(team, name), int(score)
 
-    totals = (
-        lines
-        | beam.Map(parse_player_and_score)
-        | beam.CombinePerKey(sum).with_input_types(
-            beam.typehints.Tuple[Player, int]))
-    # [END type_hints_deterministic_key]
+      totals = (
+          lines
+          | beam.Map(parse_player_and_score)
+          | beam.CombinePerKey(sum).with_input_types(
+              beam.typehints.Tuple[Player, int]))
+      # [END type_hints_deterministic_key]
 
-    assert_that(
-        totals | beam.Map(lambda (k, v): (k.name, v)),
-        equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)]))
-
-    p.run()
+      assert_that(
+          totals | beam.Map(lambda (k, v): (k.name, v)),
+          equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)]))
 
 
 class SnippetsTest(unittest.TestCase):
@@ -802,109 +798,104 @@ class CombineTest(unittest.TestCase):
     self.assertEqual({('cat', 3), ('dog', 2)}, set(perkey_counts))
 
   def test_setting_fixed_windows(self):
-    p = TestPipeline()
-    unkeyed_items = p | beam.Create([22, 33, 55, 100, 115, 120])
-    items = (unkeyed_items
-             | 'key' >> beam.Map(
-                 lambda x: beam.window.TimestampedValue(('k', x), x)))
-    # [START setting_fixed_windows]
-    from apache_beam import window
-    fixed_windowed_items = (
-        items | 'window' >> beam.WindowInto(window.FixedWindows(60)))
-    # [END setting_fixed_windows]
-    summed = (fixed_windowed_items
-              | 'group' >> beam.GroupByKey()
-              | 'combine' >> beam.CombineValues(sum))
-    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    assert_that(unkeyed, equal_to([110, 215, 120]))
-    p.run()
+    with TestPipeline() as p:
+      unkeyed_items = p | beam.Create([22, 33, 55, 100, 115, 120])
+      items = (unkeyed_items
+               | 'key' >> beam.Map(
+                   lambda x: beam.window.TimestampedValue(('k', x), x)))
+      # [START setting_fixed_windows]
+      from apache_beam import window
+      fixed_windowed_items = (
+          items | 'window' >> beam.WindowInto(window.FixedWindows(60)))
+      # [END setting_fixed_windows]
+      summed = (fixed_windowed_items
+                | 'group' >> beam.GroupByKey()
+                | 'combine' >> beam.CombineValues(sum))
+      unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+      assert_that(unkeyed, equal_to([110, 215, 120]))
 
   def test_setting_sliding_windows(self):
-    p = TestPipeline()
-    unkeyed_items = p | beam.Create([2, 16, 23])
-    items = (unkeyed_items
-             | 'key' >> beam.Map(
-                 lambda x: beam.window.TimestampedValue(('k', x), x)))
-    # [START setting_sliding_windows]
-    from apache_beam import window
-    sliding_windowed_items = (
-        items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5)))
-    # [END setting_sliding_windows]
-    summed = (sliding_windowed_items
-              | 'group' >> beam.GroupByKey()
-              | 'combine' >> beam.CombineValues(sum))
-    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    assert_that(unkeyed,
-                equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
-    p.run()
+    with TestPipeline() as p:
+      unkeyed_items = p | beam.Create([2, 16, 23])
+      items = (unkeyed_items
+               | 'key' >> beam.Map(
+                   lambda x: beam.window.TimestampedValue(('k', x), x)))
+      # [START setting_sliding_windows]
+      from apache_beam import window
+      sliding_windowed_items = (
+          items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5)))
+      # [END setting_sliding_windows]
+      summed = (sliding_windowed_items
+                | 'group' >> beam.GroupByKey()
+                | 'combine' >> beam.CombineValues(sum))
+      unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+      assert_that(unkeyed,
+                  equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
 
   def test_setting_session_windows(self):
-    p = TestPipeline()
-    unkeyed_items = p | beam.Create([2, 11, 16, 27])
-    items = (unkeyed_items
-             | 'key' >> beam.Map(
-                 lambda x: beam.window.TimestampedValue(('k', x), x)))
-    # [START setting_session_windows]
-    from apache_beam import window
-    session_windowed_items = (
-        items | 'window' >> beam.WindowInto(window.Sessions(10)))
-    # [END setting_session_windows]
-    summed = (session_windowed_items
-              | 'group' >> beam.GroupByKey()
-              | 'combine' >> beam.CombineValues(sum))
-    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    assert_that(unkeyed,
-                equal_to([29, 27]))
-    p.run()
+    with TestPipeline() as p:
+      unkeyed_items = p | beam.Create([2, 11, 16, 27])
+      items = (unkeyed_items
+               | 'key' >> beam.Map(
+                   lambda x: beam.window.TimestampedValue(('k', x), x)))
+      # [START setting_session_windows]
+      from apache_beam import window
+      session_windowed_items = (
+          items | 'window' >> beam.WindowInto(window.Sessions(10)))
+      # [END setting_session_windows]
+      summed = (session_windowed_items
+                | 'group' >> beam.GroupByKey()
+                | 'combine' >> beam.CombineValues(sum))
+      unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+      assert_that(unkeyed,
+                  equal_to([29, 27]))
 
   def test_setting_global_window(self):
-    p = TestPipeline()
-    unkeyed_items = p | beam.Create([2, 11, 16, 27])
-    items = (unkeyed_items
-             | 'key' >> beam.Map(
-                 lambda x: beam.window.TimestampedValue(('k', x), x)))
-    # [START setting_global_window]
-    from apache_beam import window
-    session_windowed_items = (
-        items | 'window' >> beam.WindowInto(window.GlobalWindows()))
-    # [END setting_global_window]
-    summed = (session_windowed_items
-              | 'group' >> beam.GroupByKey()
-              | 'combine' >> beam.CombineValues(sum))
-    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    assert_that(unkeyed, equal_to([56]))
-    p.run()
+    with TestPipeline() as p:
+      unkeyed_items = p | beam.Create([2, 11, 16, 27])
+      items = (unkeyed_items
+               | 'key' >> beam.Map(
+                   lambda x: beam.window.TimestampedValue(('k', x), x)))
+      # [START setting_global_window]
+      from apache_beam import window
+      session_windowed_items = (
+          items | 'window' >> beam.WindowInto(window.GlobalWindows()))
+      # [END setting_global_window]
+      summed = (session_windowed_items
+                | 'group' >> beam.GroupByKey()
+                | 'combine' >> beam.CombineValues(sum))
+      unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+      assert_that(unkeyed, equal_to([56]))
 
   def test_setting_timestamp(self):
-    p = TestPipeline()
-    unkeyed_items = p | beam.Create([12, 30, 60, 61, 66])
-    items = (unkeyed_items | 'key' >> beam.Map(lambda x: ('k', x)))
+    with TestPipeline() as p:
+      unkeyed_items = p | beam.Create([12, 30, 60, 61, 66])
+      items = (unkeyed_items | 'key' >> beam.Map(lambda x: ('k', x)))
 
-    def extract_timestamp_from_log_entry(entry):
-      return entry[1]
+      def extract_timestamp_from_log_entry(entry):
+        return entry[1]
 
-    # [START setting_timestamp]
-    class AddTimestampDoFn(beam.DoFn):
+      # [START setting_timestamp]
+      class AddTimestampDoFn(beam.DoFn):
 
-      def process(self, element):
-        # Extract the numeric Unix seconds-since-epoch timestamp to be
-        # associated with the current log entry.
-        unix_timestamp = extract_timestamp_from_log_entry(element)
-        # Wrap and emit the current entry and new timestamp in a
-        # TimestampedValue.
-        yield beam.window.TimestampedValue(element, unix_timestamp)
-
-    timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
-    # [END setting_timestamp]
-    fixed_windowed_items = (
-        timestamped_items | 'window' >> beam.WindowInto(
-            beam.window.FixedWindows(60)))
-    summed = (fixed_windowed_items
-              | 'group' >> beam.GroupByKey()
-              | 'combine' >> beam.CombineValues(sum))
-    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    assert_that(unkeyed, equal_to([42, 187]))
-    p.run()
+        def process(self, element):
+          # Extract the numeric Unix seconds-since-epoch timestamp to be
+          # associated with the current log entry.
+          unix_timestamp = extract_timestamp_from_log_entry(element)
+          # Wrap and emit the current entry and new timestamp in a
+          # TimestampedValue.
+          yield beam.window.TimestampedValue(element, unix_timestamp)
+
+      timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
+      # [END setting_timestamp]
+      fixed_windowed_items = (
+          timestamped_items | 'window' >> beam.WindowInto(
+              beam.window.FixedWindows(60)))
+      summed = (fixed_windowed_items
+                | 'group' >> beam.GroupByKey()
+                | 'combine' >> beam.CombineValues(sum))
+      unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+      assert_that(unkeyed, equal_to([42, 187]))
 
 
 class PTransformTest(unittest.TestCase):
@@ -919,10 +910,9 @@ class PTransformTest(unittest.TestCase):
         return pcoll | beam.Map(lambda x: len(x))
     # [END model_composite_transform]
 
-    p = TestPipeline()
-    lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths()
-    assert_that(lengths, equal_to([1, 2, 3]))
-    p.run()
+    with TestPipeline() as p:
+      lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths()
+      assert_that(lengths, equal_to([1, 2, 3]))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py 
b/sdks/python/apache_beam/examples/streaming_wordcap.py
index d0cc8a2..ce43e1f 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -41,22 +41,20 @@ def run(argv=None):
       help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
   known_args, pipeline_args = parser.parse_known_args(argv)
 
-  p = beam.Pipeline(argv=pipeline_args)
+  with beam.Pipeline(argv=pipeline_args) as p:
 
-  # Read the text file[pattern] into a PCollection.
-  lines = p | beam.io.Read(
-      beam.io.PubSubSource(known_args.input_topic))
+    # Read the text file[pattern] into a PCollection.
+    lines = p | beam.io.Read(
+        beam.io.PubSubSource(known_args.input_topic))
 
-  # Capitalize the characters in each line.
-  transformed = (lines
-                 | 'capitalize' >> (beam.Map(lambda x: x.upper())))
+    # Capitalize the characters in each line.
+    transformed = (lines
+                   | 'capitalize' >> (beam.Map(lambda x: x.upper())))
 
-  # Write to PubSub.
-  # pylint: disable=expression-not-assigned
-  transformed | beam.io.Write(
-      beam.io.PubSubSink(known_args.output_topic))
-
-  p.run().wait_until_finish()
+    # Write to PubSub.
+    # pylint: disable=expression-not-assigned
+    transformed | beam.io.Write(
+        beam.io.PubSubSink(known_args.output_topic))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py 
b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 4b6aecc..e9d5dbe 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -44,29 +44,27 @@ def run(argv=None):
       help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
   known_args, pipeline_args = parser.parse_known_args(argv)
 
-  p = beam.Pipeline(argv=pipeline_args)
-
-  # Read the text file[pattern] into a PCollection.
-  lines = p | 'read' >> beam.io.Read(
-      beam.io.PubSubSource(known_args.input_topic))
-
-  # Capitalize the characters in each line.
-  transformed = (lines
-                 | 'Split' >> (
-                     beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-                     .with_output_types(unicode))
-                 | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
-                 | beam.WindowInto(window.FixedWindows(15, 0))
-                 | 'Group' >> beam.GroupByKey()
-                 | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
-                 | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
-
-  # Write to PubSub.
-  # pylint: disable=expression-not-assigned
-  transformed | 'pubsub_write' >> beam.io.Write(
-      beam.io.PubSubSink(known_args.output_topic))
-
-  p.run().wait_until_finish()
+  with beam.Pipeline(argv=pipeline_args) as p:
+
+    # Read the text file[pattern] into a PCollection.
+    lines = p | 'read' >> beam.io.Read(
+        beam.io.PubSubSource(known_args.input_topic))
+
+    # Capitalize the characters in each line.
+    transformed = (lines
+                   | 'Split' >> (
+                       beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+                       .with_output_types(unicode))
+                   | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
+                   | beam.WindowInto(window.FixedWindows(15, 0))
+                   | 'Group' >> beam.GroupByKey()
+                   | 'Count' >> beam.Map(lambda (word, ones): (word, 
sum(ones)))
+                   | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
+
+    # Write to PubSub.
+    # pylint: disable=expression-not-assigned
+    transformed | 'pubsub_write' >> beam.io.Write(
+        beam.io.PubSubSink(known_args.output_topic))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py 
b/sdks/python/apache_beam/examples/wordcount.py
index e7e542a..34dedb2 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -102,7 +102,6 @@ def run(argv=None):
   # pylint: disable=expression-not-assigned
   output | 'write' >> WriteToText(known_args.output)
 
-  # Actually run the pipeline (all operations above are deferred).
   result = p.run()
   result.wait_until_finish()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py 
b/sdks/python/apache_beam/examples/wordcount_debugging.py
index ca9f7b6..c0ffd35 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -118,35 +118,32 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
-
-  # Read the text file[pattern] into a PCollection, count the occurrences of
-  # each word and filter by a list of words.
-  filtered_words = (
-      p | 'read' >> ReadFromText(known_args.input)
-      | CountWords()
-      | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
-
-  # assert_that is a convenient PTransform that checks a PCollection has an
-  # expected value. Asserts are best used in unit tests with small data sets 
but
-  # is demonstrated here as a teaching tool.
-  #
-  # Note assert_that does not provide any output and that successful completion
-  # of the Pipeline implies that the expectations were  met. Learn more at
-  # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
-  # test your pipeline.
-  assert_that(
-      filtered_words, equal_to([('Flourish', 3), ('stomach', 1)]))
-
-  # Format the counts into a PCollection of strings and write the output using 
a
-  # "Write" transform that has side effects.
-  # pylint: disable=unused-variable
-  output = (filtered_words
-            | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
-            | 'write' >> WriteToText(known_args.output))
-
-  # Actually run the pipeline (all operations above are deferred).
-  p.run().wait_until_finish()
+  with beam.Pipeline(options=pipeline_options) as p:
+
+    # Read the text file[pattern] into a PCollection, count the occurrences of
+    # each word and filter by a list of words.
+    filtered_words = (
+        p | 'read' >> ReadFromText(known_args.input)
+        | CountWords()
+        | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
+
+    # assert_that is a convenient PTransform that checks a PCollection has an
+    # expected value. Asserts are best used in unit tests with small data sets
+    # but is demonstrated here as a teaching tool.
+    #
+    # Note assert_that does not provide any output and that successful
+    # completion of the Pipeline implies that the expectations were  met. Learn
+    # more at https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
+    # on how to best test your pipeline.
+    assert_that(
+        filtered_words, equal_to([('Flourish', 3), ('stomach', 1)]))
+
+    # Format the counts into a PCollection of strings and write the output 
using
+    # a "Write" transform that has side effects.
+    # pylint: disable=unused-variable
+    output = (filtered_words
+              | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+              | 'write' >> WriteToText(known_args.output))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py 
b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 5109c08..76b0a22 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -92,28 +92,25 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
+  with beam.Pipeline(options=pipeline_options) as p:
 
-  # Read the text file[pattern] into a PCollection.
-  lines = p | 'read' >> ReadFromText(known_args.input)
+    # Read the text file[pattern] into a PCollection.
+    lines = p | ReadFromText(known_args.input)
 
-  # Count the occurrences of each word.
-  counts = (lines
-            | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-                          .with_output_types(unicode))
-            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
-            | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
+    # Count the occurrences of each word.
+    counts = (
+        lines
+        | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+                      .with_output_types(unicode))
+        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
+        | 'GroupAndSum' >> beam.CombinePerKey(sum))
 
-  # Format the counts into a PCollection of strings.
-  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, 
c))
+    # Format the counts into a PCollection of strings.
+    output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))
 
-  # Write the output using a "Write" transform that has side effects.
-  # pylint: disable=expression-not-assigned
-  output | 'write' >> WriteToText(known_args.output)
-
-  # Actually run the pipeline (all operations above are deferred).
-  p.run().wait_until_finish()
+    # Write the output using a "Write" transform that has side effects.
+    # pylint: disable=expression-not-assigned
+    output | WriteToText(known_args.output)
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/io/filebasedsink_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py 
b/sdks/python/apache_beam/io/filebasedsink_test.py
index 1f6aeee..7c8ddb4 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -146,9 +146,8 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
     sink = MyFileBasedSink(
         temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()
     )
-    p = TestPipeline()
-    p | beam.Create([]) | beam.io.Write(sink)  # pylint: 
disable=expression-not-assigned
-    p.run()
+    with TestPipeline() as p:
+      p | beam.Create([]) | beam.io.Write(sink)  # pylint: 
disable=expression-not-assigned
     self.assertEqual(
         open(temp_path + '-00000-of-00001.output').read(), '[start][end]')
 
@@ -160,9 +159,8 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
         file_name_suffix=StaticValueProvider(value_type=str, value='.output'),
         coder=coders.ToStringCoder()
     )
-    p = TestPipeline()
-    p | beam.Create([]) | beam.io.Write(sink)  # pylint: 
disable=expression-not-assigned
-    p.run()
+    with TestPipeline() as p:
+      p | beam.Create([]) | beam.io.Write(sink)  # pylint: 
disable=expression-not-assigned
     self.assertEqual(
         open(temp_path.get() + '-00000-of-00001.output').read(), 
'[start][end]')
 
@@ -174,10 +172,8 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
         num_shards=3,
         shard_name_template='_NN_SSS_',
         coder=coders.ToStringCoder())
-    p = TestPipeline()
-    p | beam.Create(['a', 'b']) | beam.io.Write(sink)  # pylint: 
disable=expression-not-assigned
-
-    p.run()
+    with TestPipeline() as p:
+      p | beam.Create(['a', 'b']) | beam.io.Write(sink)  # pylint: 
disable=expression-not-assigned
 
     concat = ''.join(
         open(temp_path + '_03_%03d_.output' % shard_num).read()

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 5048534..9093abf 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -28,19 +28,18 @@ to be executed for each node visited is specified through a 
runner object.
 Typical usage:
 
   # Create a pipeline object using a local runner for execution.
-  p = beam.Pipeline('DirectRunner')
+  with beam.Pipeline('DirectRunner') as p:
 
-  # Add to the pipeline a "Create" transform. When executed this
-  # transform will produce a PCollection object with the specified values.
-  pcoll = p | 'Create' >> beam.Create([1, 2, 3])
+    # Add to the pipeline a "Create" transform. When executed this
+    # transform will produce a PCollection object with the specified values.
+    pcoll = p | 'Create' >> beam.Create([1, 2, 3])
 
-  # Another transform could be applied to pcoll, e.g., writing to a text file.
-  # For other transforms, refer to transforms/ directory.
-  pcoll | 'Write' >> beam.io.WriteToText('./output')
+    # Another transform could be applied to pcoll, e.g., writing to a text 
file.
+    # For other transforms, refer to transforms/ directory.
+    pcoll | 'Write' >> beam.io.WriteToText('./output')
 
-  # run() will execute the DAG stored in the pipeline.  The execution of the
-  # nodes visited is done using the specified local runner.
-  p.run()
+    # run() will execute the DAG stored in the pipeline.  The execution of the
+    # nodes visited is done using the specified local runner.
 
 """
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index 946a60a..c79fec8 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -247,26 +247,23 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_tuple_combine_fn(self):
-    p = TestPipeline()
-    result = (
-        p
-        | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
-        | beam.CombineGlobally(combine.TupleCombineFn(max,
-                                                      combine.MeanCombineFn(),
-                                                      sum)).without_defaults())
-    assert_that(result, equal_to([('c', 111.0 / 3, 99.0)]))
-    p.run()
+    with TestPipeline() as p:
+      result = (
+          p
+          | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
+          | beam.CombineGlobally(combine.TupleCombineFn(
+              max, combine.MeanCombineFn(), sum)).without_defaults())
+      assert_that(result, equal_to([('c', 111.0 / 3, 99.0)]))
 
   def test_tuple_combine_fn_without_defaults(self):
-    p = TestPipeline()
-    result = (
-        p
-        | Create([1, 1, 2, 3])
-        | beam.CombineGlobally(
-            combine.TupleCombineFn(min, combine.MeanCombineFn(), max)
-            .with_common_input()).without_defaults())
-    assert_that(result, equal_to([(1, 7.0 / 4, 3)]))
-    p.run()
+    with TestPipeline() as p:
+      result = (
+          p
+          | Create([1, 1, 2, 3])
+          | beam.CombineGlobally(
+              combine.TupleCombineFn(min, combine.MeanCombineFn(), max)
+              .with_common_input()).without_defaults())
+      assert_that(result, equal_to([(1, 7.0 / 4, 3)]))
 
   def test_to_list_and_to_dict(self):
     pipeline = TestPipeline()
@@ -295,29 +292,26 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_combine_globally_with_default(self):
-    p = TestPipeline()
-    assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
-    p.run()
+    with TestPipeline() as p:
+      assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
 
   def test_combine_globally_without_default(self):
-    p = TestPipeline()
-    result = p | Create([]) | CombineGlobally(sum).without_defaults()
-    assert_that(result, equal_to([]))
-    p.run()
+    with TestPipeline() as p:
+      result = p | Create([]) | CombineGlobally(sum).without_defaults()
+      assert_that(result, equal_to([]))
 
   def test_combine_globally_with_default_side_input(self):
-    class CombineWithSideInput(PTransform):
+    class SideInputCombine(PTransform):
       def expand(self, pcoll):
         side = pcoll | CombineGlobally(sum).as_singleton_view()
         main = pcoll.pipeline | Create([None])
         return main | Map(lambda _, s: s, side)
 
-    p = TestPipeline()
-    result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput()
-    result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput()
-    assert_that(result1, equal_to([0]), label='r1')
-    assert_that(result2, equal_to([10]), label='r2')
-    p.run()
+    with TestPipeline() as p:
+      result1 = p | 'i1' >> Create([]) | 'c1' >> SideInputCombine()
+      result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> SideInputCombine()
+      assert_that(result1, equal_to([0]), label='r1')
+      assert_that(result2, equal_to([10]), label='r2')
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py 
b/sdks/python/apache_beam/transforms/window_test.py
index fd1bb9d..977a364 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -167,90 +167,85 @@ class WindowTest(unittest.TestCase):
             | Map(lambda x: WindowedValue((key, x), x, [GlobalWindow()])))
 
   def test_sliding_windows(self):
-    p = TestPipeline()
-    pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
-    result = (pcoll
-              | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
-              | GroupByKey()
-              | reify_windows)
-    expected = [('key @ [-2.0, 2.0)', [1]),
-                ('key @ [0.0, 4.0)', [1, 2, 3]),
-                ('key @ [2.0, 6.0)', [2, 3])]
-    assert_that(result, equal_to(expected))
-    p.run()
+    with TestPipeline() as p:
+      pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
+      result = (pcoll
+                | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
+                | GroupByKey()
+                | reify_windows)
+      expected = [('key @ [-2.0, 2.0)', [1]),
+                  ('key @ [0.0, 4.0)', [1, 2, 3]),
+                  ('key @ [2.0, 6.0)', [2, 3])]
+      assert_that(result, equal_to(expected))
 
   def test_sessions(self):
-    p = TestPipeline()
-    pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
-    result = (pcoll
-              | 'w' >> WindowInto(Sessions(10))
-              | GroupByKey()
-              | sort_values
-              | reify_windows)
-    expected = [('key @ [1.0, 13.0)', [1, 2, 3]),
-                ('key @ [20.0, 45.0)', [20, 27, 35])]
-    assert_that(result, equal_to(expected))
-    p.run()
+    with TestPipeline() as p:
+      pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
+      result = (pcoll
+                | 'w' >> WindowInto(Sessions(10))
+                | GroupByKey()
+                | sort_values
+                | reify_windows)
+      expected = [('key @ [1.0, 13.0)', [1, 2, 3]),
+                  ('key @ [20.0, 45.0)', [20, 27, 35])]
+      assert_that(result, equal_to(expected))
 
   def test_timestamped_value(self):
-    p = TestPipeline()
-    result = (p
-              | 'start' >> Create([(k, k) for k in range(10)])
-              | Map(lambda (x, t): TimestampedValue(x, t))
-              | 'w' >> WindowInto(FixedWindows(5))
-              | Map(lambda v: ('key', v))
-              | GroupByKey())
-    assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
-                                  ('key', [5, 6, 7, 8, 9])]))
-    p.run()
+    with TestPipeline() as p:
+      result = (p
+                | 'start' >> Create([(k, k) for k in range(10)])
+                | Map(lambda (x, t): TimestampedValue(x, t))
+                | 'w' >> WindowInto(FixedWindows(5))
+                | Map(lambda v: ('key', v))
+                | GroupByKey())
+      assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
+                                    ('key', [5, 6, 7, 8, 9])]))
 
   def test_rewindow(self):
-    p = TestPipeline()
-    result = (p
-              | Create([(k, k) for k in range(10)])
-              | Map(lambda (x, t): TimestampedValue(x, t))
-              | 'window' >> WindowInto(SlidingWindows(period=2, size=6))
-              # Per the model, each element is now duplicated across
-              # three windows. Rewindowing must preserve this duplication.
-              | 'rewindow' >> WindowInto(FixedWindows(5))
-              | 'rewindow2' >> WindowInto(FixedWindows(5))
-              | Map(lambda v: ('key', v))
-              | GroupByKey())
-    assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)),
-                                  ('key', sorted([5, 6, 7, 8, 9] * 3))]))
-    p.run()
+    with TestPipeline() as p:
+      result = (p
+                | Create([(k, k) for k in range(10)])
+                | Map(lambda (x, t): TimestampedValue(x, t))
+                | 'window' >> WindowInto(SlidingWindows(period=2, size=6))
+                # Per the model, each element is now duplicated across
+                # three windows. Rewindowing must preserve this duplication.
+                | 'rewindow' >> WindowInto(FixedWindows(5))
+                | 'rewindow2' >> WindowInto(FixedWindows(5))
+                | Map(lambda v: ('key', v))
+                | GroupByKey())
+      assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)),
+                                    ('key', sorted([5, 6, 7, 8, 9] * 3))]))
 
   def test_timestamped_with_combiners(self):
-    p = TestPipeline()
-    result = (p
-              # Create some initial test values.
-              | 'start' >> Create([(k, k) for k in range(10)])
-              # The purpose of the WindowInto transform is to establish a
-              # FixedWindows windowing function for the PCollection.
-              # It does not bucket elements into windows since the timestamps
-              # from Create are not spaced 5 ms apart and very likely they all
-              # fall into the same window.
-              | 'w' >> WindowInto(FixedWindows(5))
-              # Generate timestamped values using the values as timestamps.
-              # Now there are values 5 ms apart and since Map propagates the
-              # windowing function from input to output the output PCollection
-              # will have elements falling into different 5ms windows.
-              | Map(lambda (x, t): TimestampedValue(x, t))
-              # We add a 'key' to each value representing the index of the
-              # window. This is important since there is no guarantee of
-              # order for the elements of a PCollection.
-              | Map(lambda v: (v / 5, v)))
-    # Sum all elements associated with a key and window. Although it
-    # is called CombinePerKey it is really CombinePerKeyAndWindow the
-    # same way GroupByKey is really GroupByKeyAndWindow.
-    sum_per_window = result | CombinePerKey(sum)
-    # Compute mean per key and window.
-    mean_per_window = result | combiners.Mean.PerKey()
-    assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]),
-                label='assert:sum')
-    assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]),
-                label='assert:mean')
-    p.run()
+    with TestPipeline() as p:
+      result = (p
+                # Create some initial test values.
+                | 'start' >> Create([(k, k) for k in range(10)])
+                # The purpose of the WindowInto transform is to establish a
+                # FixedWindows windowing function for the PCollection.
+                # It does not bucket elements into windows since the timestamps
+                # from Create are not spaced 5 ms apart and very likely they 
all
+                # fall into the same window.
+                | 'w' >> WindowInto(FixedWindows(5))
+                # Generate timestamped values using the values as timestamps.
+                # Now there are values 5 ms apart and since Map propagates the
+                # windowing function from input to output the output 
PCollection
+                # will have elements falling into different 5ms windows.
+                | Map(lambda (x, t): TimestampedValue(x, t))
+                # We add a 'key' to each value representing the index of the
+                # window. This is important since there is no guarantee of
+                # order for the elements of a PCollection.
+                | Map(lambda v: (v / 5, v)))
+      # Sum all elements associated with a key and window. Although it
+      # is called CombinePerKey it is really CombinePerKeyAndWindow the
+      # same way GroupByKey is really GroupByKeyAndWindow.
+      sum_per_window = result | CombinePerKey(sum)
+      # Compute mean per key and window.
+      mean_per_window = result | combiners.Mean.PerKey()
+      assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]),
+                  label='assert:sum')
+      assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]),
+                  label='assert:mean')
 
 
 class RunnerApiTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py 
b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index e31b9cc..50f0deb 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -98,11 +98,10 @@ class WriteTest(unittest.TestCase):
                       return_write_results=True):
     write_to_test_sink = WriteToTestSink(return_init_result,
                                          return_write_results)
-    p = TestPipeline()
-    result = p | beam.Create(data) | write_to_test_sink | beam.Map(list)
+    with TestPipeline() as p:
+      result = p | beam.Create(data) | write_to_test_sink | beam.Map(list)
 
-    assert_that(result, is_empty())
-    p.run()
+      assert_that(result, is_empty())
 
     sink = write_to_test_sink.last_sink
     self.assertIsNotNone(sink)

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py 
b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 589dc0e..c81ef32 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -168,12 +168,11 @@ class SideInputTest(unittest.TestCase):
     @typehints.with_input_types(str, int)
     def repeat(s, times):
       return s * times
-    p = TestPipeline()
-    main_input = p | beam.Create(['a', 'bb', 'c'])
-    side_input = p | 'side' >> beam.Create([3])
-    result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input))
-    assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc']))
-    p.run()
+    with TestPipeline() as p:
+      main_input = p | beam.Create(['a', 'bb', 'c'])
+      side_input = p | 'side' >> beam.Create([3])
+      result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input))
+      assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc']))
 
     bad_side_input = p | 'bad_side' >> beam.Create(['z'])
     with self.assertRaises(typehints.TypeCheckError):
@@ -183,12 +182,11 @@ class SideInputTest(unittest.TestCase):
     @typehints.with_input_types(str, typehints.Iterable[str])
     def concat(glue, items):
       return glue.join(sorted(items))
-    p = TestPipeline()
-    main_input = p | beam.Create(['a', 'bb', 'c'])
-    side_input = p | 'side' >> beam.Create(['x', 'y', 'z'])
-    result = main_input | beam.Map(concat, pvalue.AsIter(side_input))
-    assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz']))
-    p.run()
+    with TestPipeline() as p:
+      main_input = p | beam.Create(['a', 'bb', 'c'])
+      side_input = p | 'side' >> beam.Create(['x', 'y', 'z'])
+      result = main_input | beam.Map(concat, pvalue.AsIter(side_input))
+      assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz']))
 
     bad_side_input = p | 'bad_side' >> beam.Create([1, 2, 3])
     with self.assertRaises(typehints.TypeCheckError):

Reply via email to