Repository: beam Updated Branches: refs/heads/python-sdk 86d420376 -> 4ba0b60a8
Updates snippets to use Beam text source and sink. Removes the dependency snippets_test has on dataflow native text sink. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30a01845 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30a01845 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30a01845 Branch: refs/heads/python-sdk Commit: 30a018458f51a70c0e0d6e5431b219157af8a350 Parents: 86d4203 Author: Chamikara Jayalath <[email protected]> Authored: Wed Jan 11 17:50:02 2017 -0800 Committer: Chamikara Jayalath <[email protected]> Committed: Wed Jan 11 17:50:18 2017 -0800 ---------------------------------------------------------------------- .../examples/cookbook/custom_ptransform.py | 4 +- .../apache_beam/examples/snippets/snippets.py | 100 +++++++++---------- .../examples/snippets/snippets_test.py | 96 ++++++++++++++++-- 3 files changed, 136 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/30a01845/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py index ef6bc5a..cfbb99d 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py @@ -48,9 +48,9 @@ def run_count1(known_args, options): """Runs the first example pipeline.""" logging.info('Running first pipeline') p = beam.Pipeline(options=options) - (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) + (p | beam.io.ReadFromText(known_args.input) | Count1() - | beam.io.Write(beam.io.TextFileSink(known_args.output))) + | beam.io.WriteToText(known_args.output)) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/30a01845/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 0d55125..e467353 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -48,11 +48,15 @@ class SnippetUtils(object): from apache_beam.pipeline import PipelineVisitor class RenameFiles(PipelineVisitor): - """RenameFiles will rewire source and sink for unit testing. + """RenameFiles will rewire read/write paths for unit testing. - RenameFiles will rewire the GCS files specified in the source and - sink in the snippet pipeline to local files so the pipeline can be run as a - unit test. This is as close as we can get to have code snippets that are + RenameFiles will replace the GCS files specified in the read and + write transforms to local files so the pipeline can be run as a + unit test. This assumes that read and write transforms defined in snippets + have already been replaced by transforms 'DummyReadForTesting' and + 'DummyReadForTesting' (see snippets_test.py). + + This is as close as we can get to have code snippets that are executed and are also ready to presented in webdocs. """ @@ -60,14 +64,10 @@ class SnippetUtils(object): self.renames = renames def visit_transform(self, transform_node): - if hasattr(transform_node.transform, 'source'): - source = transform_node.transform.source - source.file_path = self.renames['read'] - source.is_gcs_source = False - elif hasattr(transform_node.transform, 'sink'): - sink = transform_node.transform.sink - sink.file_path = self.renames['write'] - sink.is_gcs_sink = False + if transform_node.full_label.find('DummyReadForTesting') >= 0: + transform_node.transform.fn.file_to_read = self.renames['read'] + elif transform_node.full_label.find('DummyWriteForTesting') >= 0: + transform_node.transform.fn.file_to_write = self.renames['write'] def construct_pipeline(renames): @@ -94,8 +94,7 @@ def construct_pipeline(renames): # [END pipelines_constructing_creating] # [START pipelines_constructing_reading] - lines = p | beam.io.Read('ReadMyFile', - beam.io.TextFileSource('gs://some/inputData.txt')) + lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://some/inputData.txt') # [END pipelines_constructing_reading] # [START pipelines_constructing_applying] @@ -105,8 +104,8 @@ def construct_pipeline(renames): # [START pipelines_constructing_writing] filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words) - filtered_words | 'WriteMyFile' >> beam.io.Write( - beam.io.TextFileSink('gs://some/outputData.txt')) + filtered_words | 'WriteMyFile' >> beam.io.WriteToText( + 'gs://some/outputData.txt') # [END pipelines_constructing_writing] p.visit(SnippetUtils.RenameFiles(renames)) @@ -147,10 +146,11 @@ def model_pipelines(argv): p = beam.Pipeline(options=pipeline_options) (p - | beam.io.Read(beam.io.TextFileSource(my_options.input)) + | beam.io.ReadFromText(my_options.input) | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - | beam.Map(lambda x: (x, 1)) | beam.combiners.Count.PerKey() - | beam.io.Write(beam.io.TextFileSink(my_options.output))) + | beam.Map(lambda x: (x, 1)) + | beam.combiners.Count.PerKey() + | beam.io.WriteToText(my_options.output)) p.run() # [END model_pipelines] @@ -184,7 +184,7 @@ def model_pcollection(argv): 'Whether \'tis nobler in the mind to suffer ', 'The slings and arrows of outrageous fortune, ', 'Or to take arms against a sea of troubles, ']) - | beam.io.Write(beam.io.TextFileSink(my_options.output))) + | beam.io.WriteToText(my_options.output)) p.run() # [END model_pcollection] @@ -241,8 +241,8 @@ def pipeline_options_remote(argv): options.view_as(StandardOptions).runner = 'DirectRunner' p = Pipeline(options=options) - lines = p | beam.io.Read(beam.io.TextFileSource(my_input)) - lines | beam.io.Write(beam.io.TextFileSink(my_output)) + lines = p | beam.io.ReadFromText(my_input) + lines | beam.io.WriteToText(my_output) p.run() @@ -282,8 +282,8 @@ def pipeline_options_local(argv): p = Pipeline(options=options) # [END pipeline_options_local] - lines = p | beam.io.Read(beam.io.TextFileSource(my_input)) - lines | beam.io.Write(beam.io.TextFileSink(my_output)) + lines = p | beam.io.ReadFromText(my_input) + lines | beam.io.WriteToText(my_output) p.run() @@ -304,9 +304,8 @@ def pipeline_options_command_line(argv): # Create the Pipeline with remaining arguments. p = beam.Pipeline(argv=pipeline_args) - lines = p | beam.io.Read('ReadFromText', - beam.io.TextFileSource(known_args.input)) - lines | beam.io.Write(beam.io.TextFileSink(known_args.output)) + lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input) + lines | 'WriteToText' >> beam.io.WriteToText(known_args.output) # [END pipeline_options_command_line] p.run() @@ -344,7 +343,7 @@ def pipeline_logging(lines, output): (p | beam.Create(lines) | beam.ParDo(ExtractWordsFn()) - | beam.io.Write(beam.io.TextFileSink(output))) + | beam.io.WriteToText(output)) p.run() @@ -404,11 +403,11 @@ def pipeline_monitoring(renames): # [START pipeline_monitoring_execution] (p # Read the lines of the input text. - | 'ReadLines' >> beam.io.Read(beam.io.TextFileSource(options.input)) + | 'ReadLines' >> beam.io.ReadFromText(options.input) # Count the words. | CountWords() # Write the formatted word counts to output. - | 'WriteCounts' >> beam.io.Write(beam.io.TextFileSink(options.output))) + | 'WriteCounts' >> beam.io.WriteToText(options.output)) # [END pipeline_monitoring_execution] p.visit(SnippetUtils.RenameFiles(renames)) @@ -448,8 +447,8 @@ def examples_wordcount_minimal(renames): ( # [START examples_wordcount_minimal_read] - p | beam.io.Read(beam.io.TextFileSource( - 'gs://dataflow-samples/shakespeare/kinglear.txt')) + p | beam.io.ReadFromText( + 'gs://dataflow-samples/shakespeare/kinglear.txt') # [END examples_wordcount_minimal_read] # [START examples_wordcount_minimal_pardo] @@ -465,7 +464,7 @@ def examples_wordcount_minimal(renames): # [END examples_wordcount_minimal_map] # [START examples_wordcount_minimal_write] - | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt')) + | beam.io.WriteToText('gs://my-bucket/counts.txt') # [END examples_wordcount_minimal_write] ) @@ -502,8 +501,8 @@ def examples_wordcount_wordcount(renames): p = beam.Pipeline(options=options) # [END examples_wordcount_wordcount_options] - lines = p | beam.io.Read(beam.io.TextFileSource( - 'gs://dataflow-samples/shakespeare/kinglear.txt')) + lines = p | beam.io.ReadFromText( + 'gs://dataflow-samples/shakespeare/kinglear.txt') # [START examples_wordcount_wordcount_composite] class CountWords(beam.PTransform): @@ -530,7 +529,7 @@ def examples_wordcount_wordcount(renames): formatted = counts | beam.ParDo(FormatAsTextFn()) # [END examples_wordcount_wordcount_dofn] - formatted | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt')) + formatted | beam.io.WriteToText('gs://my-bucket/counts.txt') p.visit(SnippetUtils.RenameFiles(renames)) p.run() @@ -588,8 +587,8 @@ def examples_wordcount_debugging(renames): p = beam.Pipeline(options=PipelineOptions()) filtered_words = ( p - | beam.io.Read(beam.io.TextFileSource( - 'gs://dataflow-samples/shakespeare/kinglear.txt')) + | beam.io.ReadFromText( + 'gs://dataflow-samples/shakespeare/kinglear.txt') | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) | beam.combiners.Count.PerElement() | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) @@ -601,8 +600,7 @@ def examples_wordcount_debugging(renames): output = (filtered_words | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) - | beam.io.Write( - 'write', beam.io.TextFileSink('gs://my-bucket/counts.txt'))) + | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt')) p.visit(SnippetUtils.RenameFiles(renames)) p.run() @@ -872,18 +870,16 @@ def model_textio(renames): # [START model_textio_read] p = beam.Pipeline(options=PipelineOptions()) # [START model_pipelineio_read] - lines = p | beam.io.Read( - 'ReadFromText', - beam.io.TextFileSource('gs://my_bucket/path/to/input-*.csv')) + lines = p | 'ReadFromText' >> beam.io.ReadFromText( + 'gs://my_bucket/path/to/input-*.csv') # [END model_pipelineio_read] # [END model_textio_read] # [START model_textio_write] filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words) # [START model_pipelineio_write] - filtered_words | beam.io.Write( - 'WriteToText', beam.io.TextFileSink('gs://my_bucket/path/to/numbers', - file_name_suffix='.csv')) + filtered_words | 'WriteToText' >> beam.io.WriteToText( + 'gs://my_bucket/path/to/numbers', file_name_suffix='.csv') # [END model_pipelineio_write] # [END model_textio_write] @@ -1014,7 +1010,7 @@ def model_composite_transform_example(contents, output_path): (p | beam.Create(contents) | CountWords() - | beam.io.Write(beam.io.TextFileSink(output_path))) + | beam.io.WriteToText(output_path)) p.run() @@ -1050,7 +1046,7 @@ def model_multiple_pcollections_flatten(contents, output_path): # A list of tuples can be "piped" directly into a Flatten transform. | beam.Flatten()) # [END model_multiple_pcollections_flatten] - merged | beam.io.Write(beam.io.TextFileSink(output_path)) + merged | beam.io.WriteToText(output_path) p.run() @@ -1083,7 +1079,7 @@ def model_multiple_pcollections_partition(contents, output_path): ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile] | beam.Flatten() - | beam.io.Write(beam.io.TextFileSink(output_path))) + | beam.io.WriteToText(output_path)) p.run() @@ -1113,7 +1109,7 @@ def model_group_by_key(contents, output_path): # [END model_group_by_key_transform] (grouped_words | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts))) - | beam.io.Write(beam.io.TextFileSink(output_path))) + | beam.io.WriteToText(output_path)) p.run() @@ -1151,7 +1147,7 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path): contact_lines = result | beam.Map(join_info) # [END model_group_by_key_cogroupbykey_tuple] - contact_lines | beam.io.Write(beam.io.TextFileSink(output_path)) + contact_lines | beam.io.WriteToText(output_path) p.run() @@ -1190,7 +1186,7 @@ def model_join_using_side_inputs( contact_lines = names | beam.core.Map( "CreateContacts", join_info, AsIter(emails), AsIter(phones)) # [END model_join_using_side_inputs] - contact_lines | beam.io.Write(beam.io.TextFileSink(output_path)) + contact_lines | beam.io.WriteToText(output_path) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/30a01845/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 1a84a6e..a43e1e0 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -23,12 +23,12 @@ import os import sys import tempfile import unittest +import uuid import apache_beam as beam -from apache_beam import io +from apache_beam import coders from apache_beam import pvalue from apache_beam import typehints -from apache_beam.io import fileio from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to from apache_beam.utils.pipeline_options import TypeOptions @@ -36,9 +36,6 @@ from apache_beam.examples.snippets import snippets # pylint: disable=expression-not-assigned -# Monky-patch to use native sink for file path re-writing. -io.TextFileSink = fileio.NativeTextFileSink - class ParDoTest(unittest.TestCase): """Tests for dataflow/model/par-do.""" @@ -106,7 +103,8 @@ class ParDoTest(unittest.TestCase): # pylint: disable=line-too-long words = ['aa', 'bbc', 'defg'] # [START model_pardo_with_label] - result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word))) + result = words | 'CountUniqueLetters' >> beam.Map( + lambda word: len(set(word))) # [END model_pardo_with_label] self.assertEqual({1, 2, 4}, set(result)) @@ -350,6 +348,80 @@ class TypeHintsTest(unittest.TestCase): class SnippetsTest(unittest.TestCase): + # Replacing text read/write transforms with dummy transforms for testing. + class DummyReadTransform(beam.PTransform): + """A transform that will replace iobase.ReadFromText. + + To be used for testing. + """ + + def __init__(self, file_to_read=None): + self.file_to_read = file_to_read + + class ReadDoFn(beam.DoFn): + + def __init__(self, file_to_read): + self.file_to_read = file_to_read + self.coder = coders.StrUtf8Coder() + + def process(self, context): + pass + + def finish_bundle(self, context): + assert self.file_to_read + for file_name in glob.glob(self.file_to_read): + with open(file_name) as file: + for record in file: + yield self.coder.decode(record.rstrip('\n')) + + def expand(self, pcoll): + return pcoll | beam.Create([None]) | 'DummyReadForTesting' >> beam.ParDo( + SnippetsTest.DummyReadTransform.ReadDoFn(self.file_to_read)) + + class DummyWriteTransform(beam.PTransform): + """A transform that will replace iobase.WriteToText. + + To be used for testing. + """ + + def __init__(self, file_to_write=None, file_name_suffix=''): + self.file_to_write = file_to_write + + class WriteDoFn(beam.DoFn): + def __init__(self, file_to_write): + self.file_to_write = file_to_write + self.file_obj = None + self.coder = coders.ToStringCoder() + + def start_bundle(self, context): + assert self.file_to_write + self.file_to_write += str(uuid.uuid4()) + self.file_obj = open(self.file_to_write, 'w') + + def process(self, context): + assert self.file_obj + self.file_obj.write(self.coder.encode(context.element) + '\n') + + def finish_bundle(self, context): + assert self.file_obj + self.file_obj.close() + + def expand(self, pcoll): + return pcoll | 'DummyWriteForTesting' >> beam.ParDo( + SnippetsTest.DummyWriteTransform.WriteDoFn(self.file_to_write)) + + def setUp(self): + self.old_read_from_text = beam.io.ReadFromText + self.old_write_to_text = beam.io.WriteToText + + # Monkey patching to allow testing pipelines defined in snippets.py using + # real data. + beam.io.ReadFromText = SnippetsTest.DummyReadTransform + beam.io.WriteToText = SnippetsTest.DummyWriteTransform + + def tearDown(self): + beam.io.ReadFromText = self.old_read_from_text + beam.io.WriteToText = self.old_write_to_text def create_temp_file(self, contents=''): with tempfile.NamedTemporaryFile(delete=False) as f: @@ -357,12 +429,16 @@ class SnippetsTest(unittest.TestCase): return f.name def get_output(self, path, sorted_output=True, suffix=''): - with open(path + '-00000-of-00001' + suffix) as f: - lines = f.readlines() + all_lines = [] + for file_name in glob.glob(path + '*'): + with open(file_name) as f: + lines = f.readlines() + all_lines.extend([s.rstrip('\n') for s in lines]) + if sorted_output: - return sorted(s.rstrip('\n') for s in lines) + return sorted(s.rstrip('\n') for s in all_lines) else: - return [s.rstrip('\n') for s in lines] + return all_lines def test_model_pipelines(self): temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc')
