This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7116ab71edc183d34d128453e06a3efc15ad8905
Author: Zhijiang <wangzhijiang...@aliyun.com>
AuthorDate: Fri Jul 26 11:50:55 2019 +0200

    [FLINK-13245][network] Remove redundant bookkeeping for already canceled 
input channel IDs
---
 .../runtime/io/network/netty/PartitionRequestQueue.java  | 16 ----------------
 1 file changed, 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index b492ea6..4a845d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
@@ -40,7 +39,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -62,8 +60,6 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
        /** All the readers created for the consumers' partition requests. */
        private final ConcurrentMap<InputChannelID, NetworkSequenceViewReader> 
allReaders = new ConcurrentHashMap<>();
 
-       private final Set<InputChannelID> released = Sets.newHashSet();
-
        private boolean fatalError;
 
        private ChannelHandlerContext ctx;
@@ -175,9 +171,6 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                } else if (msg.getClass() == InputChannelID.class) {
                        // Release partition view that get a cancel request.
                        InputChannelID toCancel = (InputChannelID) msg;
-                       if (released.contains(toCancel)) {
-                               return;
-                       }
 
                        // remove reader from queue of available readers
                        availableReaders.removeIf(reader -> 
reader.getReceiverId().equals(toCancel));
@@ -222,7 +215,6 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                                        if (!reader.isReleased()) {
                                                continue;
                                        }
-                                       markAsReleased(reader.getReceiverId());
 
                                        Throwable cause = 
reader.getFailureCause();
                                        if (cause != null) {
@@ -312,14 +304,6 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                reader.notifySubpartitionConsumed();
                reader.setRegisteredAsAvailable(false);
                reader.releaseAllResources();
-               markAsReleased(reader.getReceiverId());
-       }
-
-       /**
-        * Marks a receiver as released.
-        */
-       private void markAsReleased(InputChannelID receiverId) {
-               released.add(receiverId);
        }
 
        // This listener is called after an element of the current 
nonEmptyReader has been

Reply via email to