junrao commented on code in PR #12570:
URL: https://github.com/apache/kafka/pull/12570#discussion_r964182091


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -378,6 +415,16 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer 
buffer, byte maxUsableMag
     }
 
     /**
+     * Check if there are ready batches in the queue, or we sent all batches.
+     */
+    private boolean queueHasReadyBatches(Deque<ProducerBatch> deque, long 
nowMs) {
+        // Note that we also check if the queue is empty, because that may 
mean that batches became
+        // ready and we sent them.
+        ProducerBatch last = deque.peekLast();
+        return deque.size() > 1 || last == null || last.isFull() || 
last.waitedTimeMs(nowMs) >= lingerMs;

Review Comment:
   Here, we are specifically handling the case with lingerMs > 0. I am 
wondering if the same issue described in the jira could occur with lingerMs 
equals to 0. With lingerMs = 0, because of back pressure, the effective batch 
size is typically between 1 and batch.size. With the built-in partitioner, we 
could still have the issue with a large batch size followed by a small one. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to