[ https://issues.apache.org/jira/browse/BEAM-10144?focusedWorklogId=593505&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-593505 ]
ASF GitHub Bot logged work on BEAM-10144: ----------------------------------------- Author: ASF GitHub Bot Created on: 07/May/21 20:39 Start Date: 07/May/21 20:39 Worklog Time Spent: 10m Work Description: davidcavazos commented on a change in pull request #14738: URL: https://github.com/apache/beam/pull/14738#discussion_r628503768 ########## File path: sdks/python/apache_beam/examples/snippets/snippets.py ########## @@ -110,80 +115,71 @@ def filter_words(unused_x): import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions - with beam.Pipeline(options=PipelineOptions()) as p: + beam_options = PipelineOptions() + with beam.Pipeline(options=beam_options) as pipeline: pass # build your pipeline here # [END pipelines_constructing_creating] - with TestPipeline() as p: # Use TestPipeline for testing. - # pylint: disable=line-too-long + # [START pipelines_constructing_reading] + lines = pipeline | 'ReadMyFile' >> beam.io.ReadFromText( + 'gs://some/inputData.txt') + # [END pipelines_constructing_reading] - # [START pipelines_constructing_reading] - lines = p | 'ReadMyFile' >> beam.io.ReadFromText( - 'gs://some/inputData.txt') - # [END pipelines_constructing_reading] + # [START pipelines_constructing_applying] + words = lines | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + reversed_words = words | ReverseWords() + # [END pipelines_constructing_applying] - # [START pipelines_constructing_applying] - words = lines | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - reversed_words = words | ReverseWords() - # [END pipelines_constructing_applying] + # [START pipelines_constructing_writing] + filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words) + filtered_words | 'WriteMyFile' >> beam.io.WriteToText( + 'gs://some/outputData.txt') + # [END pipelines_constructing_writing] - # [START pipelines_constructing_writing] - filtered_words = reversed_words | 'FilterWords' >> beam.Filter( - filter_words) - filtered_words | 'WriteMyFile' >> beam.io.WriteToText( - 'gs://some/outputData.txt') - # [END pipelines_constructing_writing] + pipeline.visit(SnippetUtils.RenameFiles(renames)) - p.visit(SnippetUtils.RenameFiles(renames)) - -def model_pipelines(argv): +def model_pipelines(): """A wordcount snippet as a simple pipeline example.""" # [START model_pipelines] + import argparse import re + import sys import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions - class MyOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_argument( - '--input', - dest='input', - default='gs://dataflow-samples/shakespeare/kinglear' - '.txt', - help='Input file to process.') - parser.add_argument( - '--output', - dest='output', - required=True, - help='Output file to write results to.') - - pipeline_options = PipelineOptions(argv) - my_options = pipeline_options.view_as(MyOptions) - - with beam.Pipeline(options=pipeline_options) as p: + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', + default='gs://dataflow-samples/shakespeare/kinglear.txt', + help='Input file to process.') + parser.add_argument( + '--output', required=True, help='Output file to write results to.') + args, beam_args = parser.parse_known_args(sys.argv) + beam_options = PipelineOptions(beam_args) + with beam.Pipeline(options=beam_options) as pipeline: ( - p - | beam.io.ReadFromText(my_options.input) + pipeline + | beam.io.ReadFromText(args.input) | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) | beam.Map(lambda x: (x, 1)) | beam.combiners.Count.PerKey() - | beam.io.WriteToText(my_options.output)) + | beam.io.WriteToText(args.output)) # [END model_pipelines] -def model_pcollection(argv): +def model_pcollection(output_path): """Creating a PCollection from data in local memory.""" # [START model_pcollection] + import sys + import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions - # argv = None # if None, uses sys.argv - pipeline_options = PipelineOptions(argv) - with beam.Pipeline(options=pipeline_options) as pipeline: + beam_options = PipelineOptions(sys.argv) Review comment: > Would something like this work for you? For some reason, when I mock `sys.argv`, and I've spent several hours trying multiple different ways, when `args, beam_args = parser.parse_known_args()` runs in the sample, it doesn't get the new version unless I explicitly pass `sys.argv`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 593505) Time Spent: 2h 50m (was: 2h 40m) > Update pipeline options snippets for best practices > --------------------------------------------------- > > Key: BEAM-10144 > URL: https://issues.apache.org/jira/browse/BEAM-10144 > Project: Beam > Issue Type: Improvement > Components: examples-python > Reporter: David Cavazos > Priority: P3 > Time Spent: 2h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)