soarez commented on code in PR #12752:
URL: https://github.com/apache/kafka/pull/12752#discussion_r996346998


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -127,7 +127,9 @@ public RecordAccumulator(LogContext logContext,
         this.closed = false;
         this.flushesInProgress = new AtomicInteger(0);
         this.appendsInProgress = new AtomicInteger(0);
-        this.batchSize = batchSize;
+        // As per Kafka producer configuration documentation batch.size may be 
set to 0 to explicitly disable
+        // batching which in practice actually means using a batch size of 1.
+        this.batchSize = Math.max(1, batchSize);

Review Comment:
   Is there a reason why we'd prefer that? I only see disadvantages in 
sanitizing this value there:
   
   1. In `KafkaProducer` the only other use of this config is in `BufferPool` 
which has not changed for a long while. The breaking behavior stems from recent 
changes that are localized to the `RecordAccumulator` and the 
`BuiltinPartitioner`. It does seem like `BufferPool` will work the same with 
buffer size either 0 or 1, but still, **if we focus this fix on the components 
where the bug was introduced we avoid causing further unintentional changes to 
previous behavior**.
   2. If the change is made in `RecordAccumulator` **we can use the partition 
change callbacks to test for this bug**. To detect this bug with a test against 
`KafkaProducer` we'll need to execute `producer.send(record)` in a separate 
thread under a timeout, which isn't as nice.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to