Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4649#discussion_r161797181
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
    @@ -411,6 +413,13 @@ private void addEdgeInternal(Integer upStreamVertexID,
                        StreamNode upstreamNode = 
getStreamNode(upStreamVertexID);
                        StreamNode downstreamNode = 
getStreamNode(downStreamVertexID);
     
    +                   Tuple2<Integer, Integer> edgePair = new 
Tuple2<>(upstreamNode.getId(), downstreamNode.getId());
    +                   if (!uniqueEdgeMap.containsKey(edgePair)) {
    +                           uniqueEdgeMap.put(edgePair, 1);
    +                   } else {
    +                           uniqueEdgeMap.put(edgePair, 
uniqueEdgeMap.get(edgePair) + 1);
    --- End diff --
    
    maybe wrap all of this new code into function 
    ```
    int edgeSubId = generateUniqueEdgeSubId(edgePair);
    (...)
    StreamEdge edge = new StreamEdge(..., edgeSubId);
    ```


---

Reply via email to