zjureel commented on code in PR #19380: URL: https://github.com/apache/flink/pull/19380#discussion_r974066032
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java: ########## @@ -243,6 +246,22 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exc if (toRelease != null) { releaseViewReader(toRelease); } + } else if (msg.getClass() == PartitionRequestNotifierTimeout.class) { + PartitionRequestNotifierTimeout partitionRequestNotifierTimeout = (PartitionRequestNotifierTimeout) msg; + + // Send partition not found message to the downstream task when the notifier is timeout. + final PartitionRequestNotifier partitionRequestNotifier = partitionRequestNotifierTimeout + .getPartitionRequestNotifier(); + final ResultPartitionID resultPartitionId = partitionRequestNotifier.getResultPartitionId(); + final InputChannelID inputChannelId = partitionRequestNotifier.getReceiverId(); + availableReaders.remove(partitionRequestNotifier.getViewReader()); + allReaders.remove(inputChannelId); Review Comment: When netty server receives partition request, it will create a reader and add it to allReaders even when the partition is not registered. This reader should be removed when the partition request listener for the partition may be not registered in the TM anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org