akalash commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r661557618
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java ########## @@ -91,10 +91,14 @@ public BufferAndBacklog getNextBuffer() throws IOException { updateStatistics(current); - // We simply assume all the data are non-events for batch jobs to avoid pre-fetching the - // next header - Buffer.DataType nextDataType = - numDataAndEventBuffers > 0 ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE; + // We simply assume all the data except for the last one (EndOfPartitionEvent) + // are non-events for batch jobs to avoid pre-fetching the next header + Buffer.DataType nextDataType = Buffer.DataType.NONE; + if (numDataBuffers > 0) { + nextDataType = Buffer.DataType.DATA_BUFFER; + } else if (numDataAndEventBuffers > 0) { + nextDataType = Buffer.DataType.EVENT_BUFFER; + } Review comment: Is it also a bug? Or why do we distinguish EVENT_BUFFER and DATA_BUFFER now? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java ########## @@ -130,11 +136,11 @@ private boolean shouldContinueRequest(BufferPool bufferPool) { /** Requests exclusive buffers from the provider. */ void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException { - Collection<MemorySegment> segments = globalPool.requestMemorySegments(numExclusiveBuffers); - checkArgument( - !segments.isEmpty(), - "The number of exclusive buffers per channel should be larger than 0."); + if (numExclusiveBuffers <= 0) { Review comment: As I understand, the negative number is still illegal. So maybe it makes sense to add checkArgument for `numExclusiveBuffers < 0`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java ########## @@ -110,7 +114,12 @@ private void updateStatistics(Buffer buffer) { public boolean isAvailable(int numCreditsAvailable) { // We simply assume there are no events except EndOfPartitionEvent for bath jobs, // then it has no essential effect to ignore the judgement of next event buffer. - return numCreditsAvailable > 0 && numDataAndEventBuffers > 0; + return (numCreditsAvailable > 0 || numDataBuffers == 0) && numDataAndEventBuffers > 0; Review comment: Does this mean that we don't need the credit for sending the event? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ########## @@ -100,8 +106,19 @@ public void addCredit(int creditDeltas) { numCreditsAvailable += creditDeltas; } + @Override + public boolean needAnnounceBacklog() { + return initialCredit == 0 && numCreditsAvailable == 0; + } + @Override public void resumeConsumption() { + if (initialCredit == 0) { + // reset available credit if no exclusive buffer is available at the + // consumer side for all floating buffers must have been released + numCreditsAvailable = 0; Review comment: Do we do it because we know that all floating buffers would be released before the checkpoint when `initialCredit == 0`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ########## @@ -164,6 +168,27 @@ void addCreditOrResumeConsumption( } } + /** + * Announces remaining backlog to the consumer after the available data notification or data + * consumption resumption. + */ + private void announceBacklog(NetworkSequenceViewReader reader) { + int backlog = reader.getRemainingBacklog(); + if (backlog > 0) { Review comment: as I understand, the backlog can not be less or equal to 0 here. So maybe convert it to checkArgument? Or I missed something? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java ########## @@ -340,7 +355,15 @@ private void decodeBufferOrEvent( RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable { if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) { - inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog); + try { + inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog); + } finally { + // recycle the empty buffer directly + Buffer buffer = bufferOrEvent.getBuffer(); + if (buffer != null) { + buffer.recycleBuffer(); Review comment: Before these changes where this buffer was recycled? or was it the bug? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ########## @@ -288,9 +288,7 @@ BufferAndBacklog pollBuffer() { if (buffers.isEmpty()) { flushRequested = false; - } - - while (!buffers.isEmpty()) { + } else { Review comment: I don't really get the idea of these changes. Let's suppose PipelindedSubpartition#buffers contain several but the first one is empty and finished already. How it was before the changes: - PartitionRequestQueue requests the buffer. - in any case, PipelindedSubpartition#pollBuffer returns a buffer(it skip the first one because it is empty and finished but it returns the buffer from the next consumer) - PartitionRequestQueue continues to request from this Reader until PipelindedSubpartition#buffers is not empty. After the changes: - PartitionRequestQueue requests the buffer. - PipelindedSubpartition#pollBuffer returns null. - PartitionRequestQueue remove this reader from the available readers - Other buffers from PipelindedSubpartition#buffers will be sent only when timeout happens and this reader is added to the available list again. What the point to delay the sending if we already have credit for it and we have the buffer ready to be sent? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ########## @@ -513,19 +514,20 @@ private void increaseBuffersInBacklog(BufferConsumer buffer) { } } - /** - * Gets the number of non-event buffers in this subpartition. - * - * <p><strong>Beware:</strong> This method should only be used in tests in non-concurrent access - * scenarios since it does not make any concurrency guarantees. - */ - @SuppressWarnings("FieldAccessNotGuarded") - @VisibleForTesting + /** Gets the number of non-event buffers in this subpartition. */ public int getBuffersInBacklog() { - if (flushRequested || isFinished) { - return buffersInBacklog; - } else { - return Math.max(buffersInBacklog - 1, 0); + synchronized (buffers) { + if (isBlocked || buffers.isEmpty()) { + return 0; + } + + if (flushRequested + || isFinished + || !checkNotNull(buffers.peekLast()).getBufferConsumer().isBuffer()) { Review comment: So dangerous, why you so sure that buffers contain at least one object? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ########## @@ -265,6 +265,13 @@ public void run() { channelInfo, channelStatePersister, next.getSequenceNumber()); + + // ignore the empty buffer directly + if (buffer.readableBytes() == 0) { + buffer.recycleBuffer(); Review comment: The same question that earlier - is this bug? or where did we recycle the buffer before this changes? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java ########## @@ -83,8 +84,13 @@ public BufferManager( // ------------------------------------------------------------------------ @Nullable - Buffer requestBuffer() { + Buffer requestBuffer(int initialCredit) { synchronized (bufferQueue) { + // decrease the number of buffers require to avoid the possibility of + // allocating more than required buffers after the buffer is taken + if (initialCredit == 0) { + --numRequiredBuffers; Review comment: I don't understand why `initialCredit == 0` should be handled differently here. Even if `initialCredit == 1` and the Buffer is requested we should decrease this value, or am I wrong? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -357,11 +359,26 @@ public void resumeConsumption() throws IOException { checkState(!isReleased.get(), "Channel released."); checkPartitionRequestQueueInitialized(); + if (initialCredit == 0) { + unannouncedCredit.set(0); Review comment: Do we need to do so because we released all buffers in onBlockingUpstream? If so can we hold this code in one place, ex. to move it into onBlockingUpstream? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java ########## @@ -83,8 +84,13 @@ public BufferManager( // ------------------------------------------------------------------------ @Nullable - Buffer requestBuffer() { + Buffer requestBuffer(int initialCredit) { Review comment: As I understand, initialCredit is an unchangeable value, and BufferManager and AvailableBufferQueue know this value so maybe it is better to avoid this parameter? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -357,11 +359,26 @@ public void resumeConsumption() throws IOException { checkState(!isReleased.get(), "Channel released."); checkPartitionRequestQueueInitialized(); + if (initialCredit == 0) { + unannouncedCredit.set(0); + } + // notifies the producer that this channel is ready to // unblock from checkpoint and resume data consumption partitionRequestClient.resumeConsumption(this); } + private void onBlockingUpstream() { + if (initialCredit == 0) { Review comment: Why can not we do the same thing for any number of credits not only for `initialCredit == 0`? Or does it slow down the load after resuming? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java ########## @@ -215,9 +221,15 @@ public void recycle(MemorySegment segment) { } void releaseFloatingBuffers() { + Queue<Buffer> buffers; synchronized (bufferQueue) { numRequiredBuffers = 0; - bufferQueue.releaseFloatingBuffers(); + buffers = bufferQueue.clearFloatingBuffers(); + } + + // recycle all buffers out of the synchronization block to avoid dead lock Review comment: Can you explain what kind of deadlock can happen? Between LocalBufferPool and BufferManager? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ########## @@ -265,6 +265,13 @@ public void run() { channelInfo, channelStatePersister, next.getSequenceNumber()); + + // ignore the empty buffer directly + if (buffer.readableBytes() == 0) { Review comment: How is it even possible to get an empty buffer? Who is sending it? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ########## @@ -513,19 +514,20 @@ private void increaseBuffersInBacklog(BufferConsumer buffer) { } } - /** - * Gets the number of non-event buffers in this subpartition. - * - * <p><strong>Beware:</strong> This method should only be used in tests in non-concurrent access - * scenarios since it does not make any concurrency guarantees. - */ - @SuppressWarnings("FieldAccessNotGuarded") - @VisibleForTesting + /** Gets the number of non-event buffers in this subpartition. */ public int getBuffersInBacklog() { - if (flushRequested || isFinished) { - return buffersInBacklog; - } else { - return Math.max(buffersInBacklog - 1, 0); + synchronized (buffers) { + if (isBlocked || buffers.isEmpty()) { Review comment: It seems that it is the wrong place for such condition. Logically, even if the subpartition is blocked it still has the buffers. But as I understand, specifically for the case where 'initialCredit == 0' it should return 0. So it needs to think how does it do better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org