[ 
https://issues.apache.org/jira/browse/BEAM-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750702#comment-16750702
 ] 

Maximilian Michels commented on BEAM-6473:
------------------------------------------

This looks like an issue with the Python SDK or the fuser. It fails in 
{{QueryablePipeline}}: 
https://github.com/apache/beam/blob/fc4c6baff97a1c6efbe5d09a5207184ea1818b3b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L227

It seems like {{side3}} is never produced when {{side}} is used as a side input 
to another transform. Copying the {{side3}} collection results in the test 
passing:

{code:python}
  def test_flattened_side_input(self):
    with self.create_pipeline() as p:
      main = p | 'main' >> beam.Create([None])
      side1 = p | 'side1' >> beam.Create([('a', 1)])
      side2 = p | 'side2' >> beam.Create([('b', 2)])
      side3 = p | 'side3' >> beam.Create(['another type'])
      side = (side1, side2) | beam.Flatten()
      # Making a copy here works, but using 'side' below in 
CheckFlattenOfSideInput does not
      side_copy = (side1, side2) | "side_copy" >> beam.Flatten()
      assert_that(
          main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
          equal_to([(None, {'a': 1, 'b': 2})]),
          label='CheckFlattenAsSideInput')
      assert_that(
          # This was (side, side3) before
          (side_copy, side3) | 'FlattenAfter' >> beam.Flatten(),
          equal_to([('a', 1), ('b', 2), ('another type')]),
          label='CheckFlattenOfSideInput')
{code}

Do you have a pointer [~robertwb] where to look first?


> Python Flink ValidatesRunner test_flattened_side_input fails
> ------------------------------------------------------------
>
>                 Key: BEAM-6473
>                 URL: https://issues.apache.org/jira/browse/BEAM-6473
>             Project: Beam
>          Issue Type: Test
>          Components: runner-flink, sdk-py-core
>            Reporter: Maximilian Michels
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> The {{test_flattened_side_input}} test fails after merging 
> [https://github.com/apache/beam/pull/7456]
> {noformat}
> ERROR: test_flattened_side_input (_main_.FlinkRunnerTest)
> ----------------------------------------------------------------------
> Traceback (most recent call last)
> File "apache_beam/runners/portability/fn_api_runner_test.py", line 205, in 
> test_flattened_side_input
> label='CheckFlattenOfSideInput')
> File "apache_beam/pipeline.py", line 425, in _exit_
> self.run().wait_until_finish()
> File "apache_beam/runners/portability/portable_runner.py", line 349, in 
> wait_until_finish
> self._job_id, self._state, self._last_error_message()))
> RuntimeError: Pipeline 
> test_flattened_side_input_1547859357.36_07dcde9b-acfc-4e8d-b930-582f7637a07e 
> failed in state FAILED: java.lang.IllegalArgumentException: PCollectionNodes 
> [PCollectionNode
> {id=ref_PCollection_PCollection_12, PCollection=unique_name: 
> "17side3/Map(decode).None" coder_id: "ref_Coder_BytesCoder_1" is_bounded: 
> BOUNDED windowing_strategy_id: "ref_Windowing_Windowing_1" }
> ] were consumed but never produced
> {noformat}
> [https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink_PR/134/console]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to