wanglijie95 commented on a change in pull request #18130:
URL: https://github.com/apache/flink/pull/18130#discussion_r791429362



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
##########
@@ -94,6 +97,7 @@ protected InputChannel(
         this.inputGate = checkNotNull(inputGate);
         this.channelInfo = new InputChannelInfo(inputGate.getGateIndex(), 
channelIndex);
         this.partitionId = checkNotNull(partitionId);
+        this.consumedSubpartitionIndex = consumedSubpartitionIndex;

Review comment:
       Done

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
##########
@@ -168,8 +176,7 @@ protected void notifyBufferAvailable(int 
numAvailableBuffers) throws IOException
      * <p>The queue index to request depends on which sub task the channel 
belongs to and is
      * specified by the consumer of this channel.

Review comment:
       Done

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -1073,7 +1085,32 @@ private boolean queueChannelUnsafe(InputChannel channel, 
boolean priority) {
 
     // ------------------------------------------------------------------------
 
-    public Map<IntermediateResultPartitionID, InputChannel> getInputChannels() 
{
+    public Map<SubpartitionInfo, InputChannel> getInputChannels() {
         return inputChannels;
     }
+
+    static class SubpartitionInfo {
+        private final IntermediateResultPartitionID partitionID;
+        private final int subpartitionIndex;
+
+        SubpartitionInfo(IntermediateResultPartitionID partitionID, int 
subpartitionIndex) {
+            this.partitionID = partitionID;
+            this.subpartitionIndex = subpartitionIndex;
+        }
+
+        @Override
+        public int hashCode() {
+            return partitionID.hashCode() ^ subpartitionIndex;
+        }
+
+        @Override
+        public boolean equals(Object obj) {

Review comment:
       Done

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
##########
@@ -224,15 +245,25 @@ private InputChannel createInputChannel(
                                 inputGate,
                                 index,
                                 nettyShuffleDescriptor,
+                                consumedSubpartitionIndex,
                                 channelStatistics,
                                 metrics));
     }
 
+    private static int calculateNumChannels(
+            int numShuffleDescriptors, SubpartitionIndexRange 
subpartitionIndexRange) {
+        return numShuffleDescriptors

Review comment:
       Added.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -1073,7 +1085,32 @@ private boolean queueChannelUnsafe(InputChannel channel, 
boolean priority) {
 
     // ------------------------------------------------------------------------
 
-    public Map<IntermediateResultPartitionID, InputChannel> getInputChannels() 
{
+    public Map<SubpartitionInfo, InputChannel> getInputChannels() {
         return inputChannels;
     }
+
+    static class SubpartitionInfo {
+        private final IntermediateResultPartitionID partitionID;
+        private final int subpartitionIndex;
+
+        SubpartitionInfo(IntermediateResultPartitionID partitionID, int 
subpartitionIndex) {
+            this.partitionID = partitionID;

Review comment:
       Done

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -1073,7 +1085,32 @@ private boolean queueChannelUnsafe(InputChannel channel, 
boolean priority) {
 
     // ------------------------------------------------------------------------
 
-    public Map<IntermediateResultPartitionID, InputChannel> getInputChannels() 
{
+    public Map<SubpartitionInfo, InputChannel> getInputChannels() {
         return inputChannels;
     }
+
+    static class SubpartitionInfo {
+        private final IntermediateResultPartitionID partitionID;
+        private final int subpartitionIndex;
+
+        SubpartitionInfo(IntermediateResultPartitionID partitionID, int 
subpartitionIndex) {
+            this.partitionID = partitionID;
+            this.subpartitionIndex = subpartitionIndex;

Review comment:
       Done.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
##########
@@ -834,14 +817,82 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
                             localResultPartitionId.getPartitionId(), 
localLocation));
 
             assertThat(
-                    
inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()),
+                    inputGate
+                            .getInputChannels()
+                            
.get(createSubpartitionInfo(remoteResultPartitionId.getPartitionId())),
                     is(instanceOf((RemoteInputChannel.class))));
             assertThat(
-                    
inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()),
+                    inputGate
+                            .getInputChannels()
+                            
.get(createSubpartitionInfo(localResultPartitionId.getPartitionId())),
                     is(instanceOf((LocalInputChannel.class))));
         }
     }
 
+    @Test
+    public void testUpdateUnknownChannelWithSubpartitionIndexRange()

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to