[ 
https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712255#comment-16712255
 ] 

Shimin Yang commented on FLINK-11084:
-------------------------------------

Hi [~aljoscha], I don't agree on that. From the view of application developer, 
it may leads to introduce dirty data and affects the quality of data. User 
might use this API in more complicated logic other than directly use two split 
consecutively.

For example:

SplitStream splitLog = dataStream.split(...)

DataStream logStream = splitLog.select(...)

DataStream errorStream = splitLog.select(...)

errorStream.map(...).filter(...).addSink(...)

SplitStream splitLogStream = logStream.split(...)

DataStream stream1 = splitLogStream.select.map.(...).addSink

DataStream stream2 = splitLogStream.select.map.(...).addSink

 

This will produce wrong data and very hard to debug. I think as long as still 
providing this API, we should make it right. I have an implementation on this 
and if that doesn't works out, we should at least throw an exception to 
indicate user two consecutive split and select is not supported.

 

> Incorrect ouput after two consecutive split and select
> ------------------------------------------------------
>
>                 Key: FLINK-11084
>                 URL: https://issues.apache.org/jira/browse/FLINK-11084
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Shimin Yang
>            Assignee: Shimin Yang
>            Priority: Critical
>             Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> The second OutputSelector of two successive split and select are actually not 
> rely on the first one. They are in the same array of OutputSelector in 
> DirectedOutput.
> For example.
> outputSelector1 => \{“name1” or ”name2“}
> outputSelector2 => \{”name3“ or “name4”}
> resultStream = 
> dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3")
> expectedResult for input \{StreamRecord1}:
> outputSelector1 => \{”name1“}
> outputSelector2 => \{”name3“}
> resultStream => {}
> actualResult:
> resultStream => \{StreamRecord1}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to