[ https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16100255#comment-16100255 ]
Aljoscha Krettek commented on FLINK-5031: ----------------------------------------- In that case, I would suggest to create an {{OutputTag}} for every combination of tags in your values that you're interested in. Say {{OutputTag("a")}} would receive elements that have {"tag1, "tag2"} and {{OutputTag("b")}} would receive elements with tags {{"tag3", "tag5"}}. Inside the {{ProcessFunction}} you do the filtering based on the element tag and emit to the correct output tags. This is even more efficient than split/select because split/select creates more objects under the hood. Also, the output type of your {{ProcessFunction}} can be {{Void}}, in case you never need the output. > Consecutive DataStream.split() ignored > -------------------------------------- > > Key: FLINK-5031 > URL: https://issues.apache.org/jira/browse/FLINK-5031 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0, 1.1.3 > Reporter: Fabian Hueske > Assignee: Renkai Ge > > The output of the following program > {code} > static final class ThresholdSelector implements OutputSelector<Long> { > long threshold; > public ThresholdSelector(long threshold) { > this.threshold = threshold; > } > @Override > public Iterable<String> select(Long value) { > if (value < threshold) { > return Collections.singletonList("Less"); > } else { > return Collections.singletonList("GreaterEqual"); > } > } > } > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > SplitStream<Long> split1 = env.generateSequence(1, 11) > .split(new ThresholdSelector(6)); > // stream11 should be [1,2,3,4,5] > DataStream<Long> stream11 = split1.select("Less"); > SplitStream<Long> split2 = stream11 > // .map(new MapFunction<Long, Long>() { > // @Override > // public Long map(Long value) throws Exception { > // return value; > // } > // }) > .split(new ThresholdSelector(3)); > DataStream<Long> stream21 = split2.select("Less"); > // stream21 should be [1,2] > stream21.print(); > env.execute(); > } > {code} > should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second > {{split}} operation is ignored. > The program is correctly evaluate if the identity {{MapFunction}} is added to > the program. -- This message was sent by Atlassian JIRA (v6.4.14#64029)