Repository: beam
Updated Branches:
  refs/heads/python-sdk b9cb29c83 -> 5588db8d1


Update tests and examples to use new labels.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/03e18f0d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/03e18f0d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/03e18f0d

Branch: refs/heads/python-sdk
Commit: 03e18f0d5a51963c42a9b2fc629984c163181044
Parents: b9cb29c
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jan 18 15:52:43 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jan 18 15:52:43 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/examples/complete/tfidf.py  | 12 ++++++------
 .../examples/complete/top_wikipedia_sessions.py     | 16 +++++++---------
 .../examples/cookbook/bigquery_side_input_test.py   |  6 ++----
 .../python/apache_beam/examples/cookbook/filters.py |  4 ++--
 .../apache_beam/examples/cookbook/mergecontacts.py  | 11 ++++++-----
 .../apache_beam/examples/snippets/snippets.py       |  5 ++---
 .../apache_beam/examples/streaming_wordcount.py     |  6 +++---
 sdks/python/apache_beam/io/bigquery.py              |  5 ++---
 sdks/python/apache_beam/io/fileio_test.py           |  6 +++---
 sdks/python/apache_beam/io/textio_test.py           |  6 +++---
 sdks/python/apache_beam/runners/runner_test.py      |  4 ++--
 .../apache_beam/transforms/ptransform_test.py       | 16 ++++++++--------
 12 files changed, 46 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py 
b/sdks/python/apache_beam/examples/complete/tfidf.py
index 4d6e0d3..c048cdd 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -43,7 +43,7 @@ def read_documents(pipeline, uris):
     pcolls.append(
         pipeline
         | 'read: %s' % uri >> ReadFromText(uri)
-        | beam.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri))
+        | 'withkey: %s' % uri >> beam.Map(lambda v, uri: (uri, v), uri))
   return pcolls | 'flatten read pcolls' >> beam.Flatten()
 
 
@@ -101,8 +101,8 @@ class TfIdf(beam.PTransform):
     # for a join by the URI key.
     uri_to_word_and_count = (
         uri_and_word_to_count
-        | beam.Map('shift keys',
-                   lambda ((uri, word), count): (uri, (word, count))))
+        | 'shift keys' >> beam.Map(
+            lambda ((uri, word), count): (uri, (word, count))))
 
     # Perform a CoGroupByKey (a sort of pre-join) on the prepared
     # uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and
@@ -149,9 +149,9 @@ class TfIdf(beam.PTransform):
     # DoFns in this way.
     word_to_df = (
         word_to_doc_count
-        | beam.Map('compute doc frequencies',
-                   lambda (word, count), total: (word, float(count) / total),
-                   AsSingleton(total_documents)))
+        | 'compute doc frequencies' >> beam.Map(
+            lambda (word, count), total: (word, float(count) / total),
+            AsSingleton(total_documents)))
 
     # Join the term frequency and document frequency collections,
     # each keyed on the word.

http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py 
b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 4920813..d19f66d 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -80,8 +80,8 @@ class ComputeSessions(beam.PTransform):
 
   def expand(self, pcoll):
     return (pcoll
-            | beam.WindowInto('ComputeSessionsWindow',
-                              window.Sessions(gap_size=ONE_HOUR_IN_SECONDS))
+            | 'ComputeSessionsWindow' >> beam.WindowInto(
+                window.Sessions(gap_size=ONE_HOUR_IN_SECONDS))
             | combiners.Count.PerElement())
 
 
@@ -93,11 +93,9 @@ class TopPerMonth(beam.PTransform):
 
   def expand(self, pcoll):
     return (pcoll
-            | beam.WindowInto('TopPerMonthWindow',
-                              window.FixedWindows(
-                                  size=THIRTY_DAYS_IN_SECONDS))
-            | combiners.core.CombineGlobally(
-                'Top',
+            | 'TopPerMonthWindow' >> beam.WindowInto(
+                window.FixedWindows(size=THIRTY_DAYS_IN_SECONDS))
+            | 'Top' >> combiners.core.CombineGlobally(
                 combiners.TopCombineFn(
                     10, lambda first, second: first[1] < second[1]))
             .without_defaults())
@@ -131,8 +129,8 @@ class ComputeTopSessions(beam.PTransform):
 
   def expand(self, pcoll):
     return (pcoll
-            | beam.ParDo('ExtractUserAndTimestamp',
-                         ExtractUserAndTimestampDoFn())
+            | 'ExtractUserAndTimestamp' >> beam.ParDo(
+                ExtractUserAndTimestampDoFn())
             | beam.Filter(lambda x: (abs(hash(x)) <=
                                      MAX_TIMESTAMP * self.sampling_threshold))
             | ComputeSessions()

http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py 
b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 926f141..66cab77 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -31,10 +31,8 @@ class BigQuerySideInputTest(unittest.TestCase):
     p = TestPipeline()
 
     group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C'])
-    corpus_pcoll = p | beam.Create('create_corpus',
-                                   [{'f': 'corpus1'},
-                                    {'f': 'corpus2'},
-                                    {'f': 'corpus3'}])
+    corpus_pcoll = p | 'create_corpus' >> beam.Create(
+        [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}])
     words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'},
                                                      {'f': 'word2'},
                                                      {'f': 'word3'}])

http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py 
b/sdks/python/apache_beam/examples/cookbook/filters.py
index b3a969a..7c77b9d 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -53,8 +53,8 @@ def filter_cold_days(input_data, month_filter):
   projection_fields = ['year', 'month', 'day', 'mean_temp']
   fields_of_interest = (
       input_data
-      | beam.Map('projected',
-                 lambda row: {f: row[f] for f in projection_fields}))
+      | 'projected' >> beam.Map(
+          lambda row: {f: row[f] for f in projection_fields}))
 
   # Compute the global mean temperature.
   global_mean = AsSingleton(

http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py 
b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 6906ae4..55bdc50 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -75,11 +75,12 @@ def run(argv=None, assert_results=None):
   def read_kv_textfile(label, textfile):
     return (p
             | 'read_%s' % label >> ReadFromText(textfile)
-            | beam.Map('backslash_%s' % label,
-                       lambda x: re.sub(r'\\', r'\\\\', x))
-            | beam.Map('escape_quotes_%s' % label,
-                       lambda x: re.sub(r'"', r'\"', x))
-            | beam.Map('split_%s' % label, lambda x: re.split(r'\t+', x, 1)))
+            | 'backslash_%s' % label >> beam.Map(
+                lambda x: re.sub(r'\\', r'\\\\', x))
+            | 'escape_quotes_%s' % label >> beam.Map(
+                lambda x: re.sub(r'"', r'\"', x))
+            | 'split_%s' % label >> beam.Map(
+                lambda x: re.split(r'\t+', x, 1)))
 
   # Read input databases.
   email = read_kv_textfile('email', known_args.input_email)

http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index ff240eb..631ab2d 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -778,9 +778,8 @@ def model_custom_sink(simplekv, KVs, 
final_table_name_no_ptransform,
   kvs = p | beam.core.Create(
       'CreateKVs', KVs)
 
-  kvs | beam.io.Write('WriteToSimpleKV',
-                      SimpleKVSink('http://url_to_simple_kv/',
-                                   final_table_name))
+  kvs | 'WriteToSimpleKV' >> beam.io.Write(
+      SimpleKVSink('http://url_to_simple_kv/', final_table_name))
   # [END model_custom_sink_use_new_sink]
 
   p.run().wait_until_finish()

http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py 
b/sdks/python/apache_beam/examples/streaming_wordcount.py
index adfc33d..e34a64e 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -52,9 +52,9 @@ def run(argv=None):
 
   # Capitalize the characters in each line.
   transformed = (lines
-                 | (beam.FlatMap('split',
-                                 lambda x: re.findall(r'[A-Za-z\']+', x))
-                    .with_output_types(unicode))
+                 | 'split' >> (
+                     beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+                     .with_output_types(unicode))
                  | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
                  | beam.WindowInto(window.FixedWindows(15, 0))
                  | 'group' >> beam.GroupByKey()

http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py 
b/sdks/python/apache_beam/io/bigquery.py
index 9877ebf..8f55f42 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -49,9 +49,8 @@ to avoid excessive reading:
   side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource()
   results = (
       main_table
-      | beam.Map('process data',
-               lambda element, side_input: ...,
-               AsList(side_table)))
+      | 'process data' >> beam.Map(
+          lambda element, side_input: ..., AsList(side_table)))
 
 There is no difference in how main and side inputs are read. What makes the
 side_table a 'side input' is the AsList wrapper used when passing the table

http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py 
b/sdks/python/apache_beam/io/fileio_test.py
index 8842369..772e5e2 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -762,7 +762,7 @@ class TestNativeTextFileSink(unittest.TestCase):
 
   def test_write_native(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
     pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path))  # 
pylint: disable=expression-not-assigned
     pipeline.run()
 
@@ -775,7 +775,7 @@ class TestNativeTextFileSink(unittest.TestCase):
 
   def test_write_native_auto_compression(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
     pcoll | 'Write' >> beam.Write(  # pylint: disable=expression-not-assigned
         fileio.NativeTextFileSink(
             self.path, file_name_suffix='.gz'))
@@ -790,7 +790,7 @@ class TestNativeTextFileSink(unittest.TestCase):
 
   def test_write_native_auto_compression_unsharded(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
     pcoll | 'Write' >> beam.Write(  # pylint: disable=expression-not-assigned
         fileio.NativeTextFileSink(
             self.path + '.gz', shard_name_template=''))

http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py 
b/sdks/python/apache_beam/io/textio_test.py
index 07ab4cc..ccb64ff 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -524,7 +524,7 @@ class TextSinkTest(unittest.TestCase):
 
   def test_write_dataflow(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
     pcoll | 'Write' >> WriteToText(self.path)  # pylint: 
disable=expression-not-assigned
     pipeline.run()
 
@@ -537,7 +537,7 @@ class TextSinkTest(unittest.TestCase):
 
   def test_write_dataflow_auto_compression(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
     pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz')  # 
pylint: disable=expression-not-assigned
     pipeline.run()
 
@@ -550,7 +550,7 @@ class TextSinkTest(unittest.TestCase):
 
   def test_write_dataflow_auto_compression_unsharded(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
     pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='')  
# pylint: disable=expression-not-assigned
     pipeline.run()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py 
b/sdks/python/apache_beam/runners/runner_test.py
index 89a6fdc..f95b295 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -205,8 +205,8 @@ class RunnerTest(unittest.TestCase):
                      '--temp_location=/dev/null',
                      '--no_auth=True'
                  ]))
-    rows = p | beam.io.Read('read',
-                            beam.io.BigQuerySource('dataset.faketable'))
+    rows = p | 'read' >> beam.io.Read(
+        beam.io.BigQuerySource('dataset.faketable'))
     with self.assertRaises(ValueError,
                            msg=('Coder for the GroupByKey operation'
                                 '"GroupByKey" is not a key-value coder: '

http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 58382e4..13b963c 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -562,9 +562,9 @@ class PTransformTest(unittest.TestCase):
 @beam.ptransform_fn
 def SamplePTransform(pcoll):
   """Sample transform using the @ptransform_fn decorator."""
-  map_transform = beam.Map('ToPairs', lambda v: (v, None))
-  combine_transform = beam.CombinePerKey('Group', lambda vs: None)
-  keys_transform = beam.Keys('RemoveDuplicates')
+  map_transform = 'ToPairs' >> beam.Map(lambda v: (v, None))
+  combine_transform = 'Group' >> beam.CombinePerKey(lambda vs: None)
+  keys_transform = 'RemoveDuplicates' >> beam.Keys()
   return pcoll | map_transform | combine_transform | keys_transform
 
 
@@ -575,15 +575,15 @@ class PTransformLabelsTest(unittest.TestCase):
     pardo = None
 
     def expand(self, pcoll):
-      self.pardo = beam.FlatMap('*do*', lambda x: [x + 1])
+      self.pardo = '*do*' >> beam.FlatMap(lambda x: [x + 1])
       return pcoll | self.pardo
 
   def test_chained_ptransforms(self):
     """Tests that chaining gets proper nesting."""
     pipeline = TestPipeline()
-    map1 = beam.Map('map1', lambda x: (x, 1))
-    gbk = beam.GroupByKey('gbk')
-    map2 = beam.Map('map2', lambda (x, ones): (x, sum(ones)))
+    map1 = 'map1' >> beam.Map(lambda x: (x, 1))
+    gbk = 'gbk' >> beam.GroupByKey()
+    map2 = 'map2' >> beam.Map(lambda (x, ones): (x, sum(ones)))
     t = (map1 | gbk | map2)
     result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t
     self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels)
@@ -636,7 +636,7 @@ class PTransformLabelsTest(unittest.TestCase):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(vals)
-    combine = beam.CombineGlobally('*sum*', sum)
+    combine = '*sum*' >> beam.CombineGlobally(sum)
     result = pcoll | combine
     self.assertTrue('*sum*' in pipeline.applied_labels)
     assert_that(result, equal_to([sum(vals)]))

Reply via email to