akalash commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r661557618



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
##########
@@ -91,10 +91,14 @@ public BufferAndBacklog getNextBuffer() throws IOException {
 
         updateStatistics(current);
 
-        // We simply assume all the data are non-events for batch jobs to 
avoid pre-fetching the
-        // next header
-        Buffer.DataType nextDataType =
-                numDataAndEventBuffers > 0 ? Buffer.DataType.DATA_BUFFER : 
Buffer.DataType.NONE;
+        // We simply assume all the data except for the last one 
(EndOfPartitionEvent)
+        // are non-events for batch jobs to avoid pre-fetching the next header
+        Buffer.DataType nextDataType = Buffer.DataType.NONE;
+        if (numDataBuffers > 0) {
+            nextDataType = Buffer.DataType.DATA_BUFFER;
+        } else if (numDataAndEventBuffers > 0) {
+            nextDataType = Buffer.DataType.EVENT_BUFFER;
+        }

Review comment:
       Is it also a bug? Or why do we distinguish EVENT_BUFFER and DATA_BUFFER 
now?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##########
@@ -130,11 +136,11 @@ private boolean shouldContinueRequest(BufferPool 
bufferPool) {
 
     /** Requests exclusive buffers from the provider. */
     void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException {
-        Collection<MemorySegment> segments = 
globalPool.requestMemorySegments(numExclusiveBuffers);
-        checkArgument(
-                !segments.isEmpty(),
-                "The number of exclusive buffers per channel should be larger 
than 0.");
+        if (numExclusiveBuffers <= 0) {

Review comment:
       As I understand, the negative number is still illegal. So maybe it makes 
sense to add checkArgument for `numExclusiveBuffers < 0`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
##########
@@ -110,7 +114,12 @@ private void updateStatistics(Buffer buffer) {
     public boolean isAvailable(int numCreditsAvailable) {
         // We simply assume there are no events except EndOfPartitionEvent for 
bath jobs,
         // then it has no essential effect to ignore the judgement of next 
event buffer.
-        return numCreditsAvailable > 0 && numDataAndEventBuffers > 0;
+        return (numCreditsAvailable > 0 || numDataBuffers == 0) && 
numDataAndEventBuffers > 0;

Review comment:
       Does this mean that we don't need the credit for sending the event?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##########
@@ -100,8 +106,19 @@ public void addCredit(int creditDeltas) {
         numCreditsAvailable += creditDeltas;
     }
 
+    @Override
+    public boolean needAnnounceBacklog() {
+        return initialCredit == 0 && numCreditsAvailable == 0;
+    }
+
     @Override
     public void resumeConsumption() {
+        if (initialCredit == 0) {
+            // reset available credit if no exclusive buffer is available at 
the
+            // consumer side for all floating buffers must have been released
+            numCreditsAvailable = 0;

Review comment:
       Do we do it because we know that all floating buffers would be released 
before the checkpoint when `initialCredit == 0`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##########
@@ -164,6 +168,27 @@ void addCreditOrResumeConsumption(
         }
     }
 
+    /**
+     * Announces remaining backlog to the consumer after the available data 
notification or data
+     * consumption resumption.
+     */
+    private void announceBacklog(NetworkSequenceViewReader reader) {
+        int backlog = reader.getRemainingBacklog();
+        if (backlog > 0) {

Review comment:
       as I understand, the backlog can not be less or equal to 0 here. So 
maybe convert it to checkArgument? Or I missed something?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
##########
@@ -340,7 +355,15 @@ private void decodeBufferOrEvent(
             RemoteInputChannel inputChannel, NettyMessage.BufferResponse 
bufferOrEvent)
             throws Throwable {
         if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) {
-            inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, 
bufferOrEvent.backlog);
+            try {
+                inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, 
bufferOrEvent.backlog);
+            } finally {
+                // recycle the empty buffer directly
+                Buffer buffer = bufferOrEvent.getBuffer();
+                if (buffer != null) {
+                    buffer.recycleBuffer();

Review comment:
       Before these changes where this buffer was recycled? or was it the bug?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -288,9 +288,7 @@ BufferAndBacklog pollBuffer() {
 
             if (buffers.isEmpty()) {
                 flushRequested = false;
-            }
-
-            while (!buffers.isEmpty()) {
+            } else {

Review comment:
       I don't really get the idea of these changes.
   
   Let's suppose PipelindedSubpartition#buffers contain several but the first 
one is empty and finished already.
   
   How it was before the changes:
   - PartitionRequestQueue requests the buffer.
   - in any case, PipelindedSubpartition#pollBuffer returns a buffer(it skip 
the first one because it is empty and finished but it returns the buffer from 
the next consumer)
   - PartitionRequestQueue continues to request from this Reader until 
PipelindedSubpartition#buffers is not empty.
   
   After the changes:
   
   - PartitionRequestQueue requests the buffer.
   - PipelindedSubpartition#pollBuffer returns null.
   - PartitionRequestQueue remove this reader from the available readers
   - Other buffers from PipelindedSubpartition#buffers will be sent only when 
timeout happens and this reader is added to the available list again.
   
   What the point to delay the sending if we already have credit for it and we 
have the buffer ready to be sent?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -513,19 +514,20 @@ private void increaseBuffersInBacklog(BufferConsumer 
buffer) {
         }
     }
 
-    /**
-     * Gets the number of non-event buffers in this subpartition.
-     *
-     * <p><strong>Beware:</strong> This method should only be used in tests in 
non-concurrent access
-     * scenarios since it does not make any concurrency guarantees.
-     */
-    @SuppressWarnings("FieldAccessNotGuarded")
-    @VisibleForTesting
+    /** Gets the number of non-event buffers in this subpartition. */
     public int getBuffersInBacklog() {
-        if (flushRequested || isFinished) {
-            return buffersInBacklog;
-        } else {
-            return Math.max(buffersInBacklog - 1, 0);
+        synchronized (buffers) {
+            if (isBlocked || buffers.isEmpty()) {
+                return 0;
+            }
+
+            if (flushRequested
+                    || isFinished
+                    || 
!checkNotNull(buffers.peekLast()).getBufferConsumer().isBuffer()) {

Review comment:
       So dangerous, why you so sure that buffers contain at least one object?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -265,6 +265,13 @@ public void run() {
                 channelInfo,
                 channelStatePersister,
                 next.getSequenceNumber());
+
+        // ignore the empty buffer directly
+        if (buffer.readableBytes() == 0) {
+            buffer.recycleBuffer();

Review comment:
       The same question that earlier - is this bug? or where did we recycle 
the buffer before this changes?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##########
@@ -83,8 +84,13 @@ public BufferManager(
     // ------------------------------------------------------------------------
 
     @Nullable
-    Buffer requestBuffer() {
+    Buffer requestBuffer(int initialCredit) {
         synchronized (bufferQueue) {
+            // decrease the number of buffers require to avoid the possibility 
of
+            // allocating more than required buffers after the buffer is taken
+            if (initialCredit == 0) {
+                --numRequiredBuffers;

Review comment:
       I don't understand why `initialCredit == 0` should be handled 
differently here. Even if `initialCredit == 1` and the Buffer is requested we 
should decrease this value, or am I wrong?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -357,11 +359,26 @@ public void resumeConsumption() throws IOException {
         checkState(!isReleased.get(), "Channel released.");
         checkPartitionRequestQueueInitialized();
 
+        if (initialCredit == 0) {
+            unannouncedCredit.set(0);

Review comment:
       Do we need to do so because we released all buffers in 
onBlockingUpstream? If so can we hold this code in one place, ex. to move it 
into onBlockingUpstream?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##########
@@ -83,8 +84,13 @@ public BufferManager(
     // ------------------------------------------------------------------------
 
     @Nullable
-    Buffer requestBuffer() {
+    Buffer requestBuffer(int initialCredit) {

Review comment:
       As I understand, initialCredit is an unchangeable value, and 
BufferManager and AvailableBufferQueue know this value so maybe it is better to 
avoid this parameter?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -357,11 +359,26 @@ public void resumeConsumption() throws IOException {
         checkState(!isReleased.get(), "Channel released.");
         checkPartitionRequestQueueInitialized();
 
+        if (initialCredit == 0) {
+            unannouncedCredit.set(0);
+        }
+
         // notifies the producer that this channel is ready to
         // unblock from checkpoint and resume data consumption
         partitionRequestClient.resumeConsumption(this);
     }
 
+    private void onBlockingUpstream() {
+        if (initialCredit == 0) {

Review comment:
       Why can not we do the same thing for any number of credits not only for 
`initialCredit == 0`? Or does it slow down the load after resuming?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##########
@@ -215,9 +221,15 @@ public void recycle(MemorySegment segment) {
     }
 
     void releaseFloatingBuffers() {
+        Queue<Buffer> buffers;
         synchronized (bufferQueue) {
             numRequiredBuffers = 0;
-            bufferQueue.releaseFloatingBuffers();
+            buffers = bufferQueue.clearFloatingBuffers();
+        }
+
+        // recycle all buffers out of the synchronization block to avoid dead 
lock

Review comment:
       Can you explain what kind of deadlock can happen? Between 
LocalBufferPool and BufferManager?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -265,6 +265,13 @@ public void run() {
                 channelInfo,
                 channelStatePersister,
                 next.getSequenceNumber());
+
+        // ignore the empty buffer directly
+        if (buffer.readableBytes() == 0) {

Review comment:
       How is it even possible to get an empty buffer? Who is sending it?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -513,19 +514,20 @@ private void increaseBuffersInBacklog(BufferConsumer 
buffer) {
         }
     }
 
-    /**
-     * Gets the number of non-event buffers in this subpartition.
-     *
-     * <p><strong>Beware:</strong> This method should only be used in tests in 
non-concurrent access
-     * scenarios since it does not make any concurrency guarantees.
-     */
-    @SuppressWarnings("FieldAccessNotGuarded")
-    @VisibleForTesting
+    /** Gets the number of non-event buffers in this subpartition. */
     public int getBuffersInBacklog() {
-        if (flushRequested || isFinished) {
-            return buffersInBacklog;
-        } else {
-            return Math.max(buffersInBacklog - 1, 0);
+        synchronized (buffers) {
+            if (isBlocked || buffers.isEmpty()) {

Review comment:
       It seems that it is the wrong place for such condition. Logically, even 
if the subpartition is blocked it still has the buffers. But as I understand, 
specifically for the case where 'initialCredit == 0' it should return 0. So it 
needs to think how does it do better. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to