[ https://issues.apache.org/jira/browse/BEAM-13040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17535035#comment-17535035 ]
Beam JIRA Bot commented on BEAM-13040: -------------------------------------- This issue was marked "stale-P2" and has not received a public comment in 14 days. It is now automatically moved to P3. If you are still affected by it, you can comment and move it back to P2. > AsIter side input is not correctly recognized as a dependency. > -------------------------------------------------------------- > > Key: BEAM-13040 > URL: https://issues.apache.org/jira/browse/BEAM-13040 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Environment: Linux Debian 5.10 x86_64 > Python 3.8.11 > Reporter: Jongbin Park > Priority: P3 > Time Spent: 50m > Remaining Estimate: 0h > > The error is happening at current master (head). It is fine on the latest > release (2.33.0). > Example to reproduce: > {code:python} > import unittest > import apache_beam as beam > class LoggingFn(beam.DoFn): > def __init__(self, name): > self._name = name > def process(self, element, *side_inputs): > print(f'Running {self._name} (side inputs: {[list(s) for s in > side_inputs]})') > return [self._name] > class BeamDagTest(unittest.TestCase): > def test_dag(self): > with beam.Pipeline() as p: > root = p | 'CreateRoot' >> beam.Create([None]) > example_gen = root | 'CsvExampleGen' >> beam.ParDo( > LoggingFn('CsvExampleGen'), > ) > statistics_gen = root | 'StatisticsGen' >> beam.ParDo( > LoggingFn('StatisticsGen'), > beam.pvalue.AsIter(example_gen), # AsIter to specify upstream task > dependency. > ) > schema_gen = root | 'SchemaGen' >> beam.ParDo( > LoggingFn('SchemaGen'), > beam.pvalue.AsIter(statistics_gen), > ) > example_validator = root | 'ExampleValidator' >> beam.ParDo( > LoggingFn('ExampleValidator'), > beam.pvalue.AsIter(statistics_gen), > beam.pvalue.AsIter(schema_gen), > ) > transform = root | 'Transform' >> beam.ParDo( > LoggingFn('Transform'), > beam.pvalue.AsIter(example_gen), > beam.pvalue.AsIter(schema_gen), > ) > trainer = root | 'Trainer' >> beam.ParDo( > LoggingFn('Trainer'), > beam.pvalue.AsIter(example_gen), > beam.pvalue.AsIter(schema_gen), > beam.pvalue.AsIter(transform), > ) > model_resolver = root | 'latest_blessed_model_resolver' >> beam.ParDo( > LoggingFn('latest_blessed_model_resolver'), > ) > evaluator = root | 'Evaluator' >> beam.ParDo( > LoggingFn('Evaluator'), > beam.pvalue.AsIter(example_gen), > beam.pvalue.AsIter(trainer), > beam.pvalue.AsIter(model_resolver), > ) > pusher = root | 'Pusher' >> beam.ParDo( > LoggingFn('Pusher'), > beam.pvalue.AsIter(trainer), > beam.pvalue.AsIter(evaluator), > ){code} > > According to AsIter > [documentation|https://github.com/apache/beam/blob/64ec15fa2208d8f9b5ca5653866e1992fd07f7dc/sdks/python/apache_beam/pvalue.py#L527], > entire PCollection should be made available as a side input, which means > side input PTransform should run before the current PTransform. We used to > exploit this feature to run DAG of tasks by injecting task dependency with > side inputs, however this mechanism does not work properly in current master > (71d7213d98): > Output with apache-beam==2.33.0: > {code:java} > Running CsvExampleGen (side inputs: []) > Running latest_blessed_model_resolver (side inputs: []) > Running StatisticsGen (side inputs: [['CsvExampleGen']]) > Running SchemaGen (side inputs: [['StatisticsGen']]) > Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']]) > Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']]) > Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], > ['Transform']]) > Running Evaluator (side inputs: [['CsvExampleGen'], ['Trainer'], > ['latest_blessed_model_resolver']]) > Running Pusher (side inputs: [['Trainer'], ['Evaluator']]){code} > Output with apache-beam installed from 71d7213d98 (origin/master): > {code:java} > Running CsvExampleGen (side inputs: []) > Running latest_blessed_model_resolver (side inputs: []) > Running StatisticsGen (side inputs: [['CsvExampleGen']]) > Running Pusher (side inputs: [[], []]) > Running Evaluator (side inputs: [['CsvExampleGen'], [], > ['latest_blessed_model_resolver']]) > Running SchemaGen (side inputs: [['StatisticsGen']]) > Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], []]) > Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']]) > Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']]){code} -- This message was sent by Atlassian Jira (v8.20.7#820007)