Daniel Bali created FLINK-1986: ---------------------------------- Summary: Group by fails on iterative data streams Key: FLINK-1986 URL: https://issues.apache.org/jira/browse/FLINK-1986 Project: Flink Issue Type: Bug Components: Streaming Reporter: Daniel Bali
Hello! When I try to run a `groupBy` on an IterativeDataStream I get a NullPointerException. Here is the code that reproduces the issue: {code} public Test() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); DataStream<Tuple2<Long, Long>> edges = env .generateSequence(0, 7) .map(new MapFunction<Long, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> map(Long v) throws Exception { return new Tuple2<>(v, (v + 1)); } }); IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate(); SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1) .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws Exception { return tuple; } }) .split(new OutputSelector<Tuple2<Long, Long>>() { @Override public Iterable<String> select(Tuple2<Long, Long> tuple) { List<String> output = new ArrayList<>(); output.add("iterate"); return output; } }); iteration.closeWith(step.select("iterate")); env.execute("Sandbox"); } {code} Moving the groupBy before the iteration solves the issue. e.g. this works: {code} ... iteration = edges.groupBy(1).iterate(); iteration.map(...) {code} Here is the stack trace: {code} Exception in thread "main" java.lang.NullPointerException at org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207) at org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72) at org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73) at org.apache.flink.graph.streaming.example.Test.main(Test.java:79) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)