[ 
https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16070088#comment-16070088
 ] 

Aljoscha Krettek commented on FLINK-5031:
-----------------------------------------

Actually, the current behaviour seems a bit more complicated than "union". In 
this example (from [~RenkaiGe]'s PR):
{code}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setBufferTimeout(1);

DataStream<Long> ds = env.generateSequence(0,11);

SplitStream<Long> consecutiveSplit = ds.split(new OutputSelector<Long>() {
        @Override
        public Iterable<String> select(Long value) {
                List<String> s = new ArrayList<String>();
                if (value <= 5) {
                        s.add("Less");
                } else {
                        s.add("GreaterEqual");
                }
                return s;
        }
}).select("Less")
        .split(new OutputSelector<Long>() {

                @Override
                public Iterable<String> select(Long value) {
                        List<String> s = new ArrayList<String>();
                        if (value % 2 == 0) {
                                s.add("Even");
                        } else {
                                s.add("Odd");
                        }
                        return s;
                }
        });

consecutiveSplit.select("Even").addSink(smallEvenSink);
consecutiveSplit.select("Odd").addSink(smallOddSink);
env.execute();
{code}
the output with the current master is {{0, 2, 4, 6, 8, 10}}. It works like 
this: {{split()}} operations "attach" tags to elements, then only the last 
{{select()}} is taken into account when deciding what to forward. The reason 
why the example with "Less" seems to output the union is that they both attach 
the same tag name. In the "Less"/"Even" example you can see that only the 
"Even" elements are selected and not the "Less" ones. 

It would be easy to change the behaviour to truly union, i.e. {{0, 1, 2, 3, 4, 
5, 6, 8, 10}}, in fact I have a branch that does that: 
https://github.com/aljoscha/flink/tree/finish-pr-2847-split-select-intersection 
(The names/titles are misleading because I tried doing "intersection" but it's 
not quite possible currently).

Providing "intersection" behaviour without introducing an identity operator (as 
in [~RenkaiGe]'s PR) is not possible because of how the output is sent along 
from one operator to the next in a chain and I would be very hesitant about 
adding a dummy operator for this case.

To conclude, I would actually favour not fixing this at all and instead 
deprecate split/select because it is superseded by the strictly more powerful 
side outputs. What do you think [~fhueske] as the original reporter of the 
issue?

> Consecutive DataStream.split() ignored
> --------------------------------------
>
>                 Key: FLINK-5031
>                 URL: https://issues.apache.org/jira/browse/FLINK-5031
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Fabian Hueske
>            Assignee: Renkai Ge
>
> The output of the following program 
> {code}
> static final class ThresholdSelector implements OutputSelector<Long> {
>       long threshold;
>       public ThresholdSelector(long threshold) {
>               this.threshold = threshold;
>       }
>       @Override
>       public Iterable<String> select(Long value) {
>               if (value < threshold) {
>                       return Collections.singletonList("Less");
>               } else {
>                       return Collections.singletonList("GreaterEqual");
>               }
>       }
> }
> public static void main(String[] args) throws Exception {
>       StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>       env.setParallelism(1);
>       SplitStream<Long> split1 = env.generateSequence(1, 11)
>               .split(new ThresholdSelector(6));
>       // stream11 should be [1,2,3,4,5]
>       DataStream<Long> stream11 = split1.select("Less");
>       SplitStream<Long> split2 = stream11
> //            .map(new MapFunction<Long, Long>() {
> //                    @Override
> //                    public Long map(Long value) throws Exception {
> //                            return value;
> //                    }
> //            })
>               .split(new ThresholdSelector(3));
>       DataStream<Long> stream21 = split2.select("Less");
>       // stream21 should be [1,2]
>       stream21.print();
>       env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second 
> {{split}} operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to 
> the program.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to