Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4649#discussion_r161797181 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -411,6 +413,13 @@ private void addEdgeInternal(Integer upStreamVertexID, StreamNode upstreamNode = getStreamNode(upStreamVertexID); StreamNode downstreamNode = getStreamNode(downStreamVertexID); + Tuple2<Integer, Integer> edgePair = new Tuple2<>(upstreamNode.getId(), downstreamNode.getId()); + if (!uniqueEdgeMap.containsKey(edgePair)) { + uniqueEdgeMap.put(edgePair, 1); + } else { + uniqueEdgeMap.put(edgePair, uniqueEdgeMap.get(edgePair) + 1); --- End diff -- maybe wrap all of this new code into function ``` int edgeSubId = generateUniqueEdgeSubId(edgePair); (...) StreamEdge edge = new StreamEdge(..., edgeSubId); ```
---