[ https://issues.apache.org/jira/browse/BEAM-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750702#comment-16750702 ]
Maximilian Michels commented on BEAM-6473: ------------------------------------------ This looks like an issue with the Python SDK or the fuser. It fails in {{QueryablePipeline}}: https://github.com/apache/beam/blob/fc4c6baff97a1c6efbe5d09a5207184ea1818b3b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L227 It seems like {{side3}} is never produced when {{side}} is used as a side input to another transform. Copying the {{side3}} collection results in the test passing: {code:python} def test_flattened_side_input(self): with self.create_pipeline() as p: main = p | 'main' >> beam.Create([None]) side1 = p | 'side1' >> beam.Create([('a', 1)]) side2 = p | 'side2' >> beam.Create([('b', 2)]) side3 = p | 'side3' >> beam.Create(['another type']) side = (side1, side2) | beam.Flatten() # Making a copy here works, but using 'side' below in CheckFlattenOfSideInput does not side_copy = (side1, side2) | "side_copy" >> beam.Flatten() assert_that( main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)), equal_to([(None, {'a': 1, 'b': 2})]), label='CheckFlattenAsSideInput') assert_that( # This was (side, side3) before (side_copy, side3) | 'FlattenAfter' >> beam.Flatten(), equal_to([('a', 1), ('b', 2), ('another type')]), label='CheckFlattenOfSideInput') {code} Do you have a pointer [~robertwb] where to look first? > Python Flink ValidatesRunner test_flattened_side_input fails > ------------------------------------------------------------ > > Key: BEAM-6473 > URL: https://issues.apache.org/jira/browse/BEAM-6473 > Project: Beam > Issue Type: Test > Components: runner-flink, sdk-py-core > Reporter: Maximilian Michels > Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > The {{test_flattened_side_input}} test fails after merging > [https://github.com/apache/beam/pull/7456] > {noformat} > ERROR: test_flattened_side_input (_main_.FlinkRunnerTest) > ---------------------------------------------------------------------- > Traceback (most recent call last) > File "apache_beam/runners/portability/fn_api_runner_test.py", line 205, in > test_flattened_side_input > label='CheckFlattenOfSideInput') > File "apache_beam/pipeline.py", line 425, in _exit_ > self.run().wait_until_finish() > File "apache_beam/runners/portability/portable_runner.py", line 349, in > wait_until_finish > self._job_id, self._state, self._last_error_message())) > RuntimeError: Pipeline > test_flattened_side_input_1547859357.36_07dcde9b-acfc-4e8d-b930-582f7637a07e > failed in state FAILED: java.lang.IllegalArgumentException: PCollectionNodes > [PCollectionNode > {id=ref_PCollection_PCollection_12, PCollection=unique_name: > "17side3/Map(decode).None" coder_id: "ref_Coder_BytesCoder_1" is_bounded: > BOUNDED windowing_strategy_id: "ref_Windowing_Windowing_1" } > ] were consumed but never produced > {noformat} > [https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink_PR/134/console] -- This message was sent by Atlassian JIRA (v7.6.3#76005)