[ https://issues.apache.org/jira/browse/FLINK-17578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aljoscha Krettek reassigned FLINK-17578: ---------------------------------------- Assignee: Danish Amjad > Union of 2 SideOutputs behaviour incorrect > ------------------------------------------ > > Key: FLINK-17578 > URL: https://issues.apache.org/jira/browse/FLINK-17578 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.10.0 > Reporter: Tom Wells > Assignee: Danish Amjad > Priority: Major > > Strange behaviour when using union() to merge outputs of 2 DataStreams, where > both are sourced from SideOutputs. > See example code with comments demonstrating the issue: > {code:java} > def main(args: Array[String]): Unit = { > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val input = env.fromElements(1, 2, 3, 4) > val oddTag = OutputTag[Int]("odds") > val evenTag = OutputTag[Int]("even") > val all = > input.process { > (value: Int, ctx: ProcessFunction[Int, Int]#Context, out: > Collector[Int]) => { > if (value % 2 != 0) > ctx.output(oddTag, value) > else > ctx.output(evenTag, value) > } > } > val odds = all.getSideOutput(oddTag) > val evens = all.getSideOutput(evenTag) > // These print correctly > // > odds.print // -> 1, 3 > evens.print // -> 2, 4 > // This prints incorrectly - BUG? > // > odds.union(evens).print // -> 2, 2, 4, 4 > evens.union(odds).print // -> 1, 1, 3, 3 > // Another test to understand normal behaviour of .union, using normal > inputs > // > val odds1 = env.fromElements(1, 3) > val evens1 = env.fromElements(2, 4) > // Union of 2 normal inputs > // > odds1.union(evens1).print // -> 1, 2, 3, 4 > // Union of a normal input plus an input from a sideoutput > // > odds.union(evens1).print // -> 1, 2, 3, 4 > evens1.union(odds).print // -> 1, 2, 3, 4 > // > // So it seems that when both inputs are from sideoutputs that it behaves > incorrectly... BUG? > env.execute("Test job") > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)