Repository: beam Updated Branches: refs/heads/master 4e0d5f596 -> 7019aa70d
Make stage names consistent. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/72f50209 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/72f50209 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/72f50209 Branch: refs/heads/master Commit: 72f502091c2c3e534f41b3fbd211c2a701e89eba Parents: 4e0d5f5 Author: Robert Bradshaw <rober...@gmail.com> Authored: Tue Apr 11 10:29:15 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Apr 20 08:55:02 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcp/bigquery.py | 6 +++--- .../io/gcp/datastore/v1/datastoreio.py | 6 +++--- sdks/python/apache_beam/io/iobase.py | 2 +- sdks/python/apache_beam/pipeline.py | 4 ++-- sdks/python/apache_beam/transforms/core.py | 19 ++++++++++--------- sdks/python/apache_beam/transforms/ptransform.py | 2 +- .../apache_beam/transforms/ptransform_test.py | 4 ++-- 7 files changed, 22 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/gcp/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 4686518..891f62a 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -45,11 +45,11 @@ call *one* row of the main table and *all* rows of the side table. The runner may use some caching techniques to share the side inputs between calls in order to avoid excessive reading::: - main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource() - side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource() + main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource() + side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource() results = ( main_table - | 'process data' >> beam.Map( + | 'ProcessData' >> beam.Map( lambda element, side_input: ..., AsList(side_table))) There is no difference in how main and side inputs are read. What makes the http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py index d9b3598..a0ccbbb 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py @@ -137,15 +137,15 @@ class ReadFromDatastore(PTransform): # outputs a ``PCollection[Entity]``. queries = (pcoll.pipeline - | 'User Query' >> Create([self._query]) - | 'Split Query' >> ParDo(ReadFromDatastore.SplitQueryFn( + | 'UserQuery' >> Create([self._query]) + | 'SplitQuery' >> ParDo(ReadFromDatastore.SplitQueryFn( self._project, self._query, self._datastore_namespace, self._num_splits))) sharded_queries = (queries | GroupByKey() | Values() - | 'flatten' >> FlatMap(lambda x: x)) + | 'Flatten' >> FlatMap(lambda x: x)) entities = sharded_queries | 'Read' >> ParDo( ReadFromDatastore.ReadFn(self._project, self._datastore_namespace)) http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index d9df5c4..2cac67f 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -903,7 +903,7 @@ class WriteImpl(ptransform.PTransform): | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() | 'Extract' >> core.FlatMap(lambda x: x[1])) - return do_once | 'finalize_write' >> core.FlatMap( + return do_once | 'FinalizeWrite' >> core.FlatMap( _finalize_write, self.sink, AsSingleton(init_result_coll), http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 2ff9eb3..8e811bc 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -32,11 +32,11 @@ Typical usage: # Add to the pipeline a "Create" transform. When executed this # transform will produce a PCollection object with the specified values. - pcoll = p | 'create' >> beam.Create([1, 2, 3]) + pcoll = p | 'Create' >> beam.Create([1, 2, 3]) # Another transform could be applied to pcoll, e.g., writing to a text file. # For other transforms, refer to transforms/ directory. - pcoll | 'write' >> beam.io.WriteToText('./output') + pcoll | 'Write' >> beam.io.WriteToText('./output') # run() will execute the DAG stored in the pipeline. The execution of the # nodes visited is done using the specified local runner. http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index bdfddbb..2d28eec 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1098,21 +1098,22 @@ class GroupByKey(PTransform): # pylint: disable=bad-continuation return (pcoll - | 'reify_windows' >> (ParDo(self.ReifyWindows()) + | 'ReifyWindows' >> (ParDo(self.ReifyWindows()) .with_output_types(reify_output_type)) - | 'group_by_key' >> (GroupByKeyOnly() + | 'GroupByKey' >> (GroupByKeyOnly() .with_input_types(reify_output_type) .with_output_types(gbk_input_type)) - | ('group_by_window' >> ParDo( + | ('GroupByWindow' >> ParDo( self.GroupAlsoByWindow(pcoll.windowing)) .with_input_types(gbk_input_type) .with_output_types(gbk_output_type))) - # If the input_type is None, run the default - return (pcoll - | 'reify_windows' >> ParDo(self.ReifyWindows()) - | 'group_by_key' >> GroupByKeyOnly() - | 'group_by_window' >> ParDo( - self.GroupAlsoByWindow(pcoll.windowing))) + else: + # The input_type is None, run the default + return (pcoll + | 'ReifyWindows' >> ParDo(self.ReifyWindows()) + | 'GroupByKey' >> GroupByKeyOnly() + | 'GroupByWindow' >> ParDo( + self.GroupAlsoByWindow(pcoll.windowing))) @typehints.with_input_types(typehints.KV[K, V]) http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 9b7a37f..e2c4428 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -629,7 +629,7 @@ def ptransform_fn(fn): With either method the custom PTransform can be used in pipelines as if it were one of the "native" PTransforms:: - result_pcoll = input_pcoll | 'label' >> CustomMapper(somefn) + result_pcoll = input_pcoll | 'Label' >> CustomMapper(somefn) Note that for both solutions the underlying implementation of the pipe operator (i.e., `|`) will inject the pcoll argument in its proper place http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index b92af83..78277c2 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -959,8 +959,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # If this type-checks than no error should be raised. d = (self.p - | 'bools' >> beam.Create([True, False, True]).with_output_types(bool) - | 'to_ints' >> beam.Map(bool_to_int)) + | 'Bools' >> beam.Create([True, False, True]).with_output_types(bool) + | 'ToInts' >> beam.Map(bool_to_int)) assert_that(d, equal_to([1, 0, 1])) self.p.run()