soarez commented on code in PR #12752: URL: https://github.com/apache/kafka/pull/12752#discussion_r996346998
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -127,7 +127,9 @@ public RecordAccumulator(LogContext logContext, this.closed = false; this.flushesInProgress = new AtomicInteger(0); this.appendsInProgress = new AtomicInteger(0); - this.batchSize = batchSize; + // As per Kafka producer configuration documentation batch.size may be set to 0 to explicitly disable + // batching which in practice actually means using a batch size of 1. + this.batchSize = Math.max(1, batchSize); Review Comment: Is there a reason why we'd prefer that? I only see disadvantages in sanitizing this value there: 1. In `KafkaProducer` the only other use of this config is in `BufferPool` which has not changed for a long while. The breaking behavior stems from recent changes that are localized to the `RecordAccumulator` and the `BuiltinPartitioner`. It does seem like `BufferPool` will work the same with buffer size either 0 or 1, but still, **if we focus this fix on the components where the bug was introduced we avoid causing further unintentional changes to previous behavior**. 2. If the change is made in `RecordAccumulator` **we can use the partition change callbacks to test for this bug**. To detect this bug with a test against `KafkaProducer` we'll need to execute `producer.send(record)` in a separate thread under a timeout, which isn't as nice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org