Wal8800 opened a new issue, #23228:
URL: https://github.com/apache/beam/issues/23228

   ### What happened?
   
   When a custom `DoFn` can return None depending on the element and running 
more than 2 worker in the DirectRunner, the pipeline triggers the following 
error when all the element from the inputs return None in the DoFn. 
   
   Error:
   
   ```
     File 
"/home/wal8800/workspace/test_project/venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1393, in process_bundle
       for ix, part in enumerate(input.partition(self._num_workers)):
   AttributeError: 'NoneType' object has no attribute 'partition'
   ```
   
   Example pipeline script:
   
   ```python
   import argparse
   import logging
   import re
   
   import apache_beam as beam
   from apache_beam.io import ReadFromText
   from apache_beam.io import WriteToText
   from apache_beam.options.pipeline_options import PipelineOptions
   from apache_beam.options.pipeline_options import SetupOptions
   
   
   class WordExtractingDoFn(beam.DoFn):
       """Parse each line of input text into words."""
   
       def __init__(self):
           beam.DoFn.__init__(self)
   
       def process(self, element, *args, **kwargs):
           text_line = element.strip()
           if not text_line:
               return
   
           return re.findall(r'[\w\']+', text_line, re.UNICODE)
   
   
   def main(argv=None, save_main_session=True):
       """Main entry point; defines and runs the wordcount pipeline."""
       parser = argparse.ArgumentParser()
       parser.add_argument(
           '--input',
           dest='input',
           default='gs://dataflow-samples/shakespeare/kinglear.txt',
           help='Input file to process.')
       parser.add_argument(
           '--output',
           dest='output',
           required=True,
           help='Output file to write results to.')
       known_args, pipeline_args = parser.parse_known_args(argv)
   
       pipeline_options = PipelineOptions(pipeline_args)
       pipeline_options.view_as(SetupOptions).save_main_session = 
save_main_session
       p = beam.Pipeline(options=pipeline_options)
   
       lines = p | 'read' >> ReadFromText(known_args.input)
   
       def count_ones(word_ones):
           (word, ones) = word_ones
           return (word, sum(ones))
   
       counts = (
               lines
               | 'split' >> 
(beam.ParDo(WordExtractingDoFn()).with_output_types(str))
               | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
               | 'group' >> beam.GroupByKey()
               | 'count' >> beam.Map(count_ones))
   
       # Format the counts into a PCollection of strings.
       def format_result(word_count):
           (word, count) = word_count
           return '%s: %d' % (word, count)
   
       output = counts | 'format' >> beam.Map(format_result)
       output | 'write' >> WriteToText(known_args.output)
   
       result = p.run()
       result.wait_until_finish()
   
   
   if __name__ == '__main__':
       logging.getLogger().setLevel(logging.INFO)
       main()
   ```
   
   Example input.txt (lines of empty string):
   
   ```
   
   
   
   ```
   
   Example command:
   
   ```
   python bug_example.py --runner direct --direct_running_mode in_memory 
--direct_num_workers 2 --input input.txt --output output.txt
   ```
   
   Ran the example in apache beam `2.41.0`
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: runner-direct


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to