zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r425072084
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ########## @@ -328,4 +325,67 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } + + /** + * Outbound message to be sent to the client. + */ + public static abstract class ServerOutboundMessage { + protected final InputChannelID receiverId; + protected final int backlog; + private final boolean moreAvailable; + + ServerOutboundMessage(InputChannelID receiverId, int backlog, boolean moreAvailable) { + checkArgument(backlog >= 0, "Number of backlog must be non-negative."); + this.receiverId = checkNotNull(receiverId); + this.backlog = backlog; + this.moreAvailable = moreAvailable; + } + + abstract Object build(); + + public boolean isMoreAvailable() { + return moreAvailable; + } + + void recycleBufferIfNeeded() { + } + } + + static class BufferResponseMessage extends ServerOutboundMessage { + private final Buffer buffer; + private final int sequenceNumber; + + BufferResponseMessage( + Buffer buffer, + InputChannelID receiverId, + int sequenceNumber, + int backlog, + boolean moreAvailable) { + super(receiverId, backlog, moreAvailable); + this.buffer = checkNotNull(buffer); + this.sequenceNumber = sequenceNumber; + } + + @Override + Object build() { + return new BufferResponse(buffer, sequenceNumber, receiverId, backlog); + } + + @Override + void recycleBufferIfNeeded() { + buffer.recycleBuffer(); + } + } + + static class AddBacklogMessage extends ServerOutboundMessage { + + AddBacklogMessage(InputChannelID receiverId, int backlog, boolean moreAvailable) { Review comment: remove `moreAvailable` argument from the constructor, because it seems strange for `AddBacklogMessage`, then we can give false in below `super` instead. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ########## @@ -94,10 +97,11 @@ void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) { * <p>NOTE: Only one thread would trigger the actual enqueue after checking the reader's * availability, so there is no race condition here. */ - private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception { - if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) { + void enqueueAvailableReader(final NetworkSequenceViewReader reader, BooleanSupplier condition) throws Exception { Review comment: Adjust the javadoc accordingly ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org