artemlivshits commented on code in PR #12570:
URL: https://github.com/apache/kafka/pull/12570#discussion_r967463676


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1137,23 +1137,26 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
             assertEquals(partition1, partition.get());
             assertEquals(2, mockRandom.get());
 
-            // Produce large record, we should switch to next partition.
+            // Produce large record, we switched to next partition by previous 
produce, but
+            // for this produce the switch would be disabled because of 
incomplete batch.
             accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, 
largeValue, Record.EMPTY_HEADERS,
                 callbacks, maxBlockTimeMs, false, time.milliseconds(), 
cluster);
             assertEquals(partition2, partition.get());
-            assertEquals(3, mockRandom.get());
+            assertEquals(2, mockRandom.get());

Review Comment:
   The partition switching logic is now changed to switch after the batch is 
full, so just producing the required amount is not enough.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -378,6 +415,15 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer 
buffer, byte maxUsableMag
     }
 
     /**
+     * Check if all batches in the queue are full.
+     */
+    private boolean allBatchesFull(Deque<ProducerBatch> deque) {
+        // Only the last batch may be incomplete, so we just check that.
+        ProducerBatch last = deque.peekLast();
+        return last == null || last.isFull();

Review Comment:
   That's the only functional change in this commit, the rest is renames, 
comment updates and unit test fixes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to