This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7116ab71edc183d34d128453e06a3efc15ad8905 Author: Zhijiang <wangzhijiang...@aliyun.com> AuthorDate: Fri Jul 26 11:50:55 2019 +0200 [FLINK-13245][network] Remove redundant bookkeeping for already canceled input channel IDs --- .../runtime/io/network/netty/PartitionRequestQueue.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java index b492ea6..4a845d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.io.network.partition.ProducerFailedException; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; -import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; @@ -40,7 +39,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayDeque; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -62,8 +60,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { /** All the readers created for the consumers' partition requests. */ private final ConcurrentMap<InputChannelID, NetworkSequenceViewReader> allReaders = new ConcurrentHashMap<>(); - private final Set<InputChannelID> released = Sets.newHashSet(); - private boolean fatalError; private ChannelHandlerContext ctx; @@ -175,9 +171,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { } else if (msg.getClass() == InputChannelID.class) { // Release partition view that get a cancel request. InputChannelID toCancel = (InputChannelID) msg; - if (released.contains(toCancel)) { - return; - } // remove reader from queue of available readers availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel)); @@ -222,7 +215,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { if (!reader.isReleased()) { continue; } - markAsReleased(reader.getReceiverId()); Throwable cause = reader.getFailureCause(); if (cause != null) { @@ -312,14 +304,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { reader.notifySubpartitionConsumed(); reader.setRegisteredAsAvailable(false); reader.releaseAllResources(); - markAsReleased(reader.getReceiverId()); - } - - /** - * Marks a receiver as released. - */ - private void markAsReleased(InputChannelID receiverId) { - released.add(receiverId); } // This listener is called after an element of the current nonEmptyReader has been