Repository: beam Updated Branches: refs/heads/python-sdk b9cb29c83 -> 5588db8d1
Update tests and examples to use new labels. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/03e18f0d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/03e18f0d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/03e18f0d Branch: refs/heads/python-sdk Commit: 03e18f0d5a51963c42a9b2fc629984c163181044 Parents: b9cb29c Author: Ahmet Altay <al...@google.com> Authored: Wed Jan 18 15:52:43 2017 -0800 Committer: Ahmet Altay <al...@google.com> Committed: Wed Jan 18 15:52:43 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/examples/complete/tfidf.py | 12 ++++++------ .../examples/complete/top_wikipedia_sessions.py | 16 +++++++--------- .../examples/cookbook/bigquery_side_input_test.py | 6 ++---- .../python/apache_beam/examples/cookbook/filters.py | 4 ++-- .../apache_beam/examples/cookbook/mergecontacts.py | 11 ++++++----- .../apache_beam/examples/snippets/snippets.py | 5 ++--- .../apache_beam/examples/streaming_wordcount.py | 6 +++--- sdks/python/apache_beam/io/bigquery.py | 5 ++--- sdks/python/apache_beam/io/fileio_test.py | 6 +++--- sdks/python/apache_beam/io/textio_test.py | 6 +++--- sdks/python/apache_beam/runners/runner_test.py | 4 ++-- .../apache_beam/transforms/ptransform_test.py | 16 ++++++++-------- 12 files changed, 46 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/complete/tfidf.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index 4d6e0d3..c048cdd 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -43,7 +43,7 @@ def read_documents(pipeline, uris): pcolls.append( pipeline | 'read: %s' % uri >> ReadFromText(uri) - | beam.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri)) + | 'withkey: %s' % uri >> beam.Map(lambda v, uri: (uri, v), uri)) return pcolls | 'flatten read pcolls' >> beam.Flatten() @@ -101,8 +101,8 @@ class TfIdf(beam.PTransform): # for a join by the URI key. uri_to_word_and_count = ( uri_and_word_to_count - | beam.Map('shift keys', - lambda ((uri, word), count): (uri, (word, count)))) + | 'shift keys' >> beam.Map( + lambda ((uri, word), count): (uri, (word, count)))) # Perform a CoGroupByKey (a sort of pre-join) on the prepared # uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and @@ -149,9 +149,9 @@ class TfIdf(beam.PTransform): # DoFns in this way. word_to_df = ( word_to_doc_count - | beam.Map('compute doc frequencies', - lambda (word, count), total: (word, float(count) / total), - AsSingleton(total_documents))) + | 'compute doc frequencies' >> beam.Map( + lambda (word, count), total: (word, float(count) / total), + AsSingleton(total_documents))) # Join the term frequency and document frequency collections, # each keyed on the word. http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index 4920813..d19f66d 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -80,8 +80,8 @@ class ComputeSessions(beam.PTransform): def expand(self, pcoll): return (pcoll - | beam.WindowInto('ComputeSessionsWindow', - window.Sessions(gap_size=ONE_HOUR_IN_SECONDS)) + | 'ComputeSessionsWindow' >> beam.WindowInto( + window.Sessions(gap_size=ONE_HOUR_IN_SECONDS)) | combiners.Count.PerElement()) @@ -93,11 +93,9 @@ class TopPerMonth(beam.PTransform): def expand(self, pcoll): return (pcoll - | beam.WindowInto('TopPerMonthWindow', - window.FixedWindows( - size=THIRTY_DAYS_IN_SECONDS)) - | combiners.core.CombineGlobally( - 'Top', + | 'TopPerMonthWindow' >> beam.WindowInto( + window.FixedWindows(size=THIRTY_DAYS_IN_SECONDS)) + | 'Top' >> combiners.core.CombineGlobally( combiners.TopCombineFn( 10, lambda first, second: first[1] < second[1])) .without_defaults()) @@ -131,8 +129,8 @@ class ComputeTopSessions(beam.PTransform): def expand(self, pcoll): return (pcoll - | beam.ParDo('ExtractUserAndTimestamp', - ExtractUserAndTimestampDoFn()) + | 'ExtractUserAndTimestamp' >> beam.ParDo( + ExtractUserAndTimestampDoFn()) | beam.Filter(lambda x: (abs(hash(x)) <= MAX_TIMESTAMP * self.sampling_threshold)) | ComputeSessions() http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py index 926f141..66cab77 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py @@ -31,10 +31,8 @@ class BigQuerySideInputTest(unittest.TestCase): p = TestPipeline() group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C']) - corpus_pcoll = p | beam.Create('create_corpus', - [{'f': 'corpus1'}, - {'f': 'corpus2'}, - {'f': 'corpus3'}]) + corpus_pcoll = p | 'create_corpus' >> beam.Create( + [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}]) words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}]) http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/cookbook/filters.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py index b3a969a..7c77b9d 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters.py +++ b/sdks/python/apache_beam/examples/cookbook/filters.py @@ -53,8 +53,8 @@ def filter_cold_days(input_data, month_filter): projection_fields = ['year', 'month', 'day', 'mean_temp'] fields_of_interest = ( input_data - | beam.Map('projected', - lambda row: {f: row[f] for f in projection_fields})) + | 'projected' >> beam.Map( + lambda row: {f: row[f] for f in projection_fields})) # Compute the global mean temperature. global_mean = AsSingleton( http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/cookbook/mergecontacts.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index 6906ae4..55bdc50 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -75,11 +75,12 @@ def run(argv=None, assert_results=None): def read_kv_textfile(label, textfile): return (p | 'read_%s' % label >> ReadFromText(textfile) - | beam.Map('backslash_%s' % label, - lambda x: re.sub(r'\\', r'\\\\', x)) - | beam.Map('escape_quotes_%s' % label, - lambda x: re.sub(r'"', r'\"', x)) - | beam.Map('split_%s' % label, lambda x: re.split(r'\t+', x, 1))) + | 'backslash_%s' % label >> beam.Map( + lambda x: re.sub(r'\\', r'\\\\', x)) + | 'escape_quotes_%s' % label >> beam.Map( + lambda x: re.sub(r'"', r'\"', x)) + | 'split_%s' % label >> beam.Map( + lambda x: re.split(r'\t+', x, 1))) # Read input databases. email = read_kv_textfile('email', known_args.input_email) http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index ff240eb..631ab2d 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -778,9 +778,8 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, kvs = p | beam.core.Create( 'CreateKVs', KVs) - kvs | beam.io.Write('WriteToSimpleKV', - SimpleKVSink('http://url_to_simple_kv/', - final_table_name)) + kvs | 'WriteToSimpleKV' >> beam.io.Write( + SimpleKVSink('http://url_to_simple_kv/', final_table_name)) # [END model_custom_sink_use_new_sink] p.run().wait_until_finish() http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/examples/streaming_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index adfc33d..e34a64e 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -52,9 +52,9 @@ def run(argv=None): # Capitalize the characters in each line. transformed = (lines - | (beam.FlatMap('split', - lambda x: re.findall(r'[A-Za-z\']+', x)) - .with_output_types(unicode)) + | 'split' >> ( + beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + .with_output_types(unicode)) | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | beam.WindowInto(window.FixedWindows(15, 0)) | 'group' >> beam.GroupByKey() http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/io/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 9877ebf..8f55f42 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -49,9 +49,8 @@ to avoid excessive reading: side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource() results = ( main_table - | beam.Map('process data', - lambda element, side_input: ..., - AsList(side_table))) + | 'process data' >> beam.Map( + lambda element, side_input: ..., AsList(side_table))) There is no difference in how main and side inputs are read. What makes the side_table a 'side input' is the AsList wrapper used when passing the table http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 8842369..772e5e2 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -762,7 +762,7 @@ class TestNativeTextFileSink(unittest.TestCase): def test_write_native(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned pipeline.run() @@ -775,7 +775,7 @@ class TestNativeTextFileSink(unittest.TestCase): def test_write_native_auto_compression(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned fileio.NativeTextFileSink( self.path, file_name_suffix='.gz')) @@ -790,7 +790,7 @@ class TestNativeTextFileSink(unittest.TestCase): def test_write_native_auto_compression_unsharded(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned fileio.NativeTextFileSink( self.path + '.gz', shard_name_template='')) http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/io/textio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 07ab4cc..ccb64ff 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -524,7 +524,7 @@ class TextSinkTest(unittest.TestCase): def test_write_dataflow(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText(self.path) # pylint: disable=expression-not-assigned pipeline.run() @@ -537,7 +537,7 @@ class TextSinkTest(unittest.TestCase): def test_write_dataflow_auto_compression(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz') # pylint: disable=expression-not-assigned pipeline.run() @@ -550,7 +550,7 @@ class TextSinkTest(unittest.TestCase): def test_write_dataflow_auto_compression_unsharded(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='') # pylint: disable=expression-not-assigned pipeline.run() http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 89a6fdc..f95b295 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -205,8 +205,8 @@ class RunnerTest(unittest.TestCase): '--temp_location=/dev/null', '--no_auth=True' ])) - rows = p | beam.io.Read('read', - beam.io.BigQuerySource('dataset.faketable')) + rows = p | 'read' >> beam.io.Read( + beam.io.BigQuerySource('dataset.faketable')) with self.assertRaises(ValueError, msg=('Coder for the GroupByKey operation' '"GroupByKey" is not a key-value coder: ' http://git-wip-us.apache.org/repos/asf/beam/blob/03e18f0d/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 58382e4..13b963c 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -562,9 +562,9 @@ class PTransformTest(unittest.TestCase): @beam.ptransform_fn def SamplePTransform(pcoll): """Sample transform using the @ptransform_fn decorator.""" - map_transform = beam.Map('ToPairs', lambda v: (v, None)) - combine_transform = beam.CombinePerKey('Group', lambda vs: None) - keys_transform = beam.Keys('RemoveDuplicates') + map_transform = 'ToPairs' >> beam.Map(lambda v: (v, None)) + combine_transform = 'Group' >> beam.CombinePerKey(lambda vs: None) + keys_transform = 'RemoveDuplicates' >> beam.Keys() return pcoll | map_transform | combine_transform | keys_transform @@ -575,15 +575,15 @@ class PTransformLabelsTest(unittest.TestCase): pardo = None def expand(self, pcoll): - self.pardo = beam.FlatMap('*do*', lambda x: [x + 1]) + self.pardo = '*do*' >> beam.FlatMap(lambda x: [x + 1]) return pcoll | self.pardo def test_chained_ptransforms(self): """Tests that chaining gets proper nesting.""" pipeline = TestPipeline() - map1 = beam.Map('map1', lambda x: (x, 1)) - gbk = beam.GroupByKey('gbk') - map2 = beam.Map('map2', lambda (x, ones): (x, sum(ones))) + map1 = 'map1' >> beam.Map(lambda x: (x, 1)) + gbk = 'gbk' >> beam.GroupByKey() + map2 = 'map2' >> beam.Map(lambda (x, ones): (x, sum(ones))) t = (map1 | gbk | map2) result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels) @@ -636,7 +636,7 @@ class PTransformLabelsTest(unittest.TestCase): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create(vals) - combine = beam.CombineGlobally('*sum*', sum) + combine = '*sum*' >> beam.CombineGlobally(sum) result = pcoll | combine self.assertTrue('*sum*' in pipeline.applied_labels) assert_that(result, equal_to([sum(vals)]))