artemlivshits commented on code in PR #12570: URL: https://github.com/apache/kafka/pull/12570#discussion_r967463676
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -1137,23 +1137,26 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { assertEquals(partition1, partition.get()); assertEquals(2, mockRandom.get()); - // Produce large record, we should switch to next partition. + // Produce large record, we switched to next partition by previous produce, but + // for this produce the switch would be disabled because of incomplete batch. accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); assertEquals(partition2, partition.get()); - assertEquals(3, mockRandom.get()); + assertEquals(2, mockRandom.get()); Review Comment: The partition switching logic is now changed to switch after the batch is full, so just producing the required amount is not enough. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -378,6 +415,15 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMag } /** + * Check if all batches in the queue are full. + */ + private boolean allBatchesFull(Deque<ProducerBatch> deque) { + // Only the last batch may be incomplete, so we just check that. + ProducerBatch last = deque.peekLast(); + return last == null || last.isFull(); Review Comment: That's the only functional change in this commit, the rest is renames, comment updates and unit test fixes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org