Repository: flink Updated Branches: refs/heads/release-1.5 41e2b3e17 -> dd5639164
[FLINK-9216][Streaming] Fix comparator violation This closes #5878. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05c6ef1e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05c6ef1e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05c6ef1e Branch: refs/heads/release-1.5 Commit: 05c6ef1efede6a3f51eb79103ae0ff392dda7b07 Parents: 547152e Author: Xpray <[email protected]> Authored: Mon Apr 23 15:37:39 2018 +0800 Committer: zentol <[email protected]> Committed: Wed May 2 15:18:59 2018 +0200 ---------------------------------------------------------------------- .../streaming/api/graph/JSONGenerator.java | 12 ++++++---- .../api/graph/StreamGraphGeneratorTest.java | 24 ++++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/05c6ef1e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java index 263e0aa..3f82cf3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java @@ -61,14 +61,16 @@ public class JSONGenerator { List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs()); Collections.sort(operatorIDs, new Comparator<Integer>() { @Override - public int compare(Integer o1, Integer o2) { + public int compare(Integer idOne, Integer idTwo) { + boolean isIdOneSinkId = streamGraph.getSinkIDs().contains(idOne); + boolean isIdTwoSinkId = streamGraph.getSinkIDs().contains(idTwo); // put sinks at the back - if (streamGraph.getSinkIDs().contains(o1)) { + if (isIdOneSinkId == isIdTwoSinkId) { + return idOne.compareTo(idTwo); + } else if (isIdOneSinkId) { return 1; - } else if (streamGraph.getSinkIDs().contains(o2)) { - return -1; } else { - return o1 - o2; + return -1; } } }); http://git-wip-us.apache.org/repos/asf/flink/blob/05c6ef1e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 8149d24..d10fb3c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -413,6 +414,29 @@ public class StreamGraphGeneratorTest { StreamPartitioner<?> streamPartitioner2 = keyedResultNode.getInEdges().get(1).getPartitioner(); } + /** + * Tests that the json generated by JSONGenerator shall meet with 2 requirements: + * 1. sink nodes are at the back + * 2. if both two nodes are sink nodes or neither of them is sink node, then sort by its id. + */ + @Test + public void testSinkIdComparison() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Integer> source = env.fromElements(1, 2, 3); + for (int i = 0; i < 32; i++) { + if (i % 2 == 0) { + source.addSink(new SinkFunction<Integer>() { + @Override + public void invoke(Integer value) throws Exception {} + }); + } else { + source.map(x -> x + 1); + } + } + // IllegalArgumentException will be thrown without FLINK-9216 + env.getStreamGraph().getStreamingPlanAsJSON(); + } + private static class OutputTypeConfigurableOperationWithTwoInputs extends AbstractStreamOperator<Integer> implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> {
