zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r425072084



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##########
@@ -328,4 +325,67 @@ public void operationComplete(ChannelFuture future) throws 
Exception {
                        }
                }
        }
+
+       /**
+        * Outbound message to be sent to the client.
+        */
+       public static abstract class ServerOutboundMessage {
+               protected final InputChannelID receiverId;
+               protected final int backlog;
+               private final boolean moreAvailable;
+
+               ServerOutboundMessage(InputChannelID receiverId, int backlog, 
boolean moreAvailable) {
+                       checkArgument(backlog >= 0, "Number of backlog must be 
non-negative.");
+                       this.receiverId = checkNotNull(receiverId);
+                       this.backlog = backlog;
+                       this.moreAvailable = moreAvailable;
+               }
+
+               abstract Object build();
+
+               public boolean isMoreAvailable() {
+                       return moreAvailable;
+               }
+
+               void recycleBufferIfNeeded() {
+               }
+       }
+
+       static class BufferResponseMessage extends ServerOutboundMessage {
+               private final Buffer buffer;
+               private final int sequenceNumber;
+
+               BufferResponseMessage(
+                               Buffer buffer,
+                               InputChannelID receiverId,
+                               int sequenceNumber,
+                               int backlog,
+                               boolean moreAvailable) {
+                       super(receiverId, backlog, moreAvailable);
+                       this.buffer = checkNotNull(buffer);
+                       this.sequenceNumber = sequenceNumber;
+               }
+
+               @Override
+               Object build() {
+                       return new BufferResponse(buffer, sequenceNumber, 
receiverId, backlog);
+               }
+
+               @Override
+               void recycleBufferIfNeeded() {
+                       buffer.recycleBuffer();
+               }
+       }
+
+       static class AddBacklogMessage extends ServerOutboundMessage {
+
+               AddBacklogMessage(InputChannelID receiverId, int backlog, 
boolean moreAvailable) {

Review comment:
       remove `moreAvailable` argument from the constructor, because it seems 
strange for `AddBacklogMessage`, then we can give false in below `super` 
instead.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##########
@@ -94,10 +97,11 @@ void notifyReaderNonEmpty(final NetworkSequenceViewReader 
reader) {
         * <p>NOTE: Only one thread would trigger the actual enqueue after 
checking the reader's
         * availability, so there is no race condition here.
         */
-       private void enqueueAvailableReader(final NetworkSequenceViewReader 
reader) throws Exception {
-               if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
+       void enqueueAvailableReader(final NetworkSequenceViewReader reader, 
BooleanSupplier condition) throws Exception {

Review comment:
       Adjust the javadoc accordingly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to