Repository: beam
Updated Branches:
  refs/heads/master 4e0d5f596 -> 7019aa70d


Make stage names consistent.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/72f50209
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/72f50209
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/72f50209

Branch: refs/heads/master
Commit: 72f502091c2c3e534f41b3fbd211c2a701e89eba
Parents: 4e0d5f5
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Tue Apr 11 10:29:15 2017 -0700
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Thu Apr 20 08:55:02 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/bigquery.py       |  6 +++---
 .../io/gcp/datastore/v1/datastoreio.py           |  6 +++---
 sdks/python/apache_beam/io/iobase.py             |  2 +-
 sdks/python/apache_beam/pipeline.py              |  4 ++--
 sdks/python/apache_beam/transforms/core.py       | 19 ++++++++++---------
 sdks/python/apache_beam/transforms/ptransform.py |  2 +-
 .../apache_beam/transforms/ptransform_test.py    |  4 ++--
 7 files changed, 22 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 4686518..891f62a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -45,11 +45,11 @@ call *one* row of the main table and *all* rows of the side 
table. The runner
 may use some caching techniques to share the side inputs between calls in order
 to avoid excessive reading:::
 
-  main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource()
-  side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource()
+  main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource()
+  side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource()
   results = (
       main_table
-      | 'process data' >> beam.Map(
+      | 'ProcessData' >> beam.Map(
           lambda element, side_input: ..., AsList(side_table)))
 
 There is no difference in how main and side inputs are read. What makes the

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py 
b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index d9b3598..a0ccbbb 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -137,15 +137,15 @@ class ReadFromDatastore(PTransform):
     #   outputs a ``PCollection[Entity]``.
 
     queries = (pcoll.pipeline
-               | 'User Query' >> Create([self._query])
-               | 'Split Query' >> ParDo(ReadFromDatastore.SplitQueryFn(
+               | 'UserQuery' >> Create([self._query])
+               | 'SplitQuery' >> ParDo(ReadFromDatastore.SplitQueryFn(
                    self._project, self._query, self._datastore_namespace,
                    self._num_splits)))
 
     sharded_queries = (queries
                        | GroupByKey()
                        | Values()
-                       | 'flatten' >> FlatMap(lambda x: x))
+                       | 'Flatten' >> FlatMap(lambda x: x))
 
     entities = sharded_queries | 'Read' >> ParDo(
         ReadFromDatastore.ReadFn(self._project, self._datastore_namespace))

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index d9df5c4..2cac67f 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -903,7 +903,7 @@ class WriteImpl(ptransform.PTransform):
                            | core.WindowInto(window.GlobalWindows())
                            | core.GroupByKey()
                            | 'Extract' >> core.FlatMap(lambda x: x[1]))
-    return do_once | 'finalize_write' >> core.FlatMap(
+    return do_once | 'FinalizeWrite' >> core.FlatMap(
         _finalize_write,
         self.sink,
         AsSingleton(init_result_coll),

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 2ff9eb3..8e811bc 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -32,11 +32,11 @@ Typical usage:
 
   # Add to the pipeline a "Create" transform. When executed this
   # transform will produce a PCollection object with the specified values.
-  pcoll = p | 'create' >> beam.Create([1, 2, 3])
+  pcoll = p | 'Create' >> beam.Create([1, 2, 3])
 
   # Another transform could be applied to pcoll, e.g., writing to a text file.
   # For other transforms, refer to transforms/ directory.
-  pcoll | 'write' >> beam.io.WriteToText('./output')
+  pcoll | 'Write' >> beam.io.WriteToText('./output')
 
   # run() will execute the DAG stored in the pipeline.  The execution of the
   # nodes visited is done using the specified local runner.

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index bdfddbb..2d28eec 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1098,21 +1098,22 @@ class GroupByKey(PTransform):
 
       # pylint: disable=bad-continuation
       return (pcoll
-              | 'reify_windows' >> (ParDo(self.ReifyWindows())
+              | 'ReifyWindows' >> (ParDo(self.ReifyWindows())
                  .with_output_types(reify_output_type))
-              | 'group_by_key' >> (GroupByKeyOnly()
+              | 'GroupByKey' >> (GroupByKeyOnly()
                  .with_input_types(reify_output_type)
                  .with_output_types(gbk_input_type))
-              | ('group_by_window' >> ParDo(
+              | ('GroupByWindow' >> ParDo(
                      self.GroupAlsoByWindow(pcoll.windowing))
                  .with_input_types(gbk_input_type)
                  .with_output_types(gbk_output_type)))
-    # If the input_type is None, run the default
-    return (pcoll
-            | 'reify_windows' >> ParDo(self.ReifyWindows())
-            | 'group_by_key' >> GroupByKeyOnly()
-            | 'group_by_window' >> ParDo(
-                self.GroupAlsoByWindow(pcoll.windowing)))
+    else:
+      # The input_type is None, run the default
+      return (pcoll
+              | 'ReifyWindows' >> ParDo(self.ReifyWindows())
+              | 'GroupByKey' >> GroupByKeyOnly()
+              | 'GroupByWindow' >> ParDo(
+                    self.GroupAlsoByWindow(pcoll.windowing)))
 
 
 @typehints.with_input_types(typehints.KV[K, V])

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index 9b7a37f..e2c4428 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -629,7 +629,7 @@ def ptransform_fn(fn):
   With either method the custom PTransform can be used in pipelines as if
   it were one of the "native" PTransforms::
 
-    result_pcoll = input_pcoll | 'label' >> CustomMapper(somefn)
+    result_pcoll = input_pcoll | 'Label' >> CustomMapper(somefn)
 
   Note that for both solutions the underlying implementation of the pipe
   operator (i.e., `|`) will inject the pcoll argument in its proper place

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index b92af83..78277c2 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -959,8 +959,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     # If this type-checks than no error should be raised.
     d = (self.p
-         | 'bools' >> beam.Create([True, False, True]).with_output_types(bool)
-         | 'to_ints' >> beam.Map(bool_to_int))
+         | 'Bools' >> beam.Create([True, False, True]).with_output_types(bool)
+         | 'ToInts' >> beam.Map(bool_to_int))
     assert_that(d, equal_to([1, 0, 1]))
     self.p.run()
 

Reply via email to