[ 
https://issues.apache.org/jira/browse/BEAM-4826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570945#comment-16570945
 ] 

Luke Cwik commented on BEAM-4826:
---------------------------------

This seems like a case where the GreedyPipelineFuser is incorrectly creating an 
ExecutableStage with a malformed flatten. Since the ExecutableStage contains 
the pipeline components, the flatten that is inserted into the ExecutableStage 
should have its inputs pruned there.

> Flink runner sends bad flatten to SDK
> -------------------------------------
>
>                 Key: BEAM-4826
>                 URL: https://issues.apache.org/jira/browse/BEAM-4826
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Henning Rohde
>            Assignee: Ankur Goenka
>            Priority: Major
>              Labels: portability
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> For a Go flatten test w/ 3 input, the Flink runner splits this into 3 bundle 
> descriptors. But it sends the original 3-input flatten but w/ 1 actual input 
> present in each bundle descriptor. This is inconsistent and the SDK shouldn't 
> expect dangling PCollections. In contrast, Dataflow removes the flatten when 
> it does the same split.
> Snippet:
> register: <
>   process_bundle_descriptor: <
>     id: "3"
>     transforms: <
>       key: "e4"
>       value: <
>         unique_name: "github.com/apache/beam/sdks/go/pkg/beam.createFn'1"
>         spec: <
>           urn: "urn:beam:transform:pardo:v1"
>           payload: [...]
>         >
>         inputs: <
>           key: "i0"
>           value: "n3"
>         >
>         outputs: <
>           key: "i0"
>           value: "n4"
>         >
>       >
>     >
>     transforms: <
>       key: "e7"
>       value: <
>         unique_name: "Flatten"
>         spec: <
>           urn: "beam:transform:flatten:v1"
>         >
>         inputs: <
>           key: "i0"
>           value: "n2"
>         >
>         inputs: <
>           key: "i1"
>           value: "n4" . // <----------- only one present.
>         >
>         inputs: <
>           key: "i2"
>           value: "n6"
>         >
>         outputs: <
>           key: "i0"
>           value: "n7"
>         >
>       >
>     >
> [...]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to