Wal8800 opened a new issue, #23228:
URL: https://github.com/apache/beam/issues/23228
### What happened?
When a custom `DoFn` can return None depending on the element and running
more than 2 worker in the DirectRunner, the pipeline triggers the following
error when all the element from the inputs return None in the DoFn.
Error:
```
File
"/home/wal8800/workspace/test_project/venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1393, in process_bundle
for ix, part in enumerate(input.partition(self._num_workers)):
AttributeError: 'NoneType' object has no attribute 'partition'
```
Example pipeline script:
```python
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def __init__(self):
beam.DoFn.__init__(self)
def process(self, element, *args, **kwargs):
text_line = element.strip()
if not text_line:
return
return re.findall(r'[\w\']+', text_line, re.UNICODE)
def main(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session =
save_main_session
p = beam.Pipeline(options=pipeline_options)
lines = p | 'read' >> ReadFromText(known_args.input)
def count_ones(word_ones):
(word, ones) = word_ones
return (word, sum(ones))
counts = (
lines
| 'split' >>
(beam.ParDo(WordExtractingDoFn()).with_output_types(str))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
# Format the counts into a PCollection of strings.
def format_result(word_count):
(word, count) = word_count
return '%s: %d' % (word, count)
output = counts | 'format' >> beam.Map(format_result)
output | 'write' >> WriteToText(known_args.output)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
main()
```
Example input.txt (lines of empty string):
```
```
Example command:
```
python bug_example.py --runner direct --direct_running_mode in_memory
--direct_num_workers 2 --input input.txt --output output.txt
```
Ran the example in apache beam `2.41.0`
### Issue Priority
Priority: 2
### Issue Component
Component: runner-direct
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]