[ https://issues.apache.org/jira/browse/KAFKA-9965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17390950#comment-17390950 ]
Luke Chen commented on KAFKA-9965: ---------------------------------- [~cmccabe] [~ijuma] , I have a proposal for this issue. Currently in WIP PR: [https://github.com/apache/kafka/pull/8690,] we tried to fix the `RoundRobinPartitioner` by decrement the counter in `onNewBatch` callback. However, this fix can't fix all 3rd party partitioners if they don't implement the `onNewBatch` method. The root cause should be that we added a new method `onNewBatch` in Kafka 2.4, and some old 3rd-party partitioners don't implement it, which cause we call partitioner twice. These partitioners don't implement `onNewBatch` method means partitioners won't have different partition on new batch, ex: RoundRobinPartitioner. We should not have another call to partitioner in these cases. I'm thinking we can throw exception for default onNewBatch method like this: {code:java} default void onNewBatch(String topic, Cluster cluster, int prevPartition) { // orignally, the default implementation is empty, now, we'll throw exception here. throw new UnsupportedOperationException(); }{code} This way, we don't change the API, and also we can know the partitioner doesn't implement the `onNewBatch` method, and we can directly use the previously paritioner's partition in producer, ex: {code:java} // KafkaProducer#doSend private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { ... // first time we call partitioner int partition = partition(record, serializedKey, serializedValue, cluster); ... log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // try to append the record, and abort when new batch RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); // new batch case if (result.abortForNewBatch) { int prevPartition = partition; // partitioner.onNewBatch(record.topic(), cluster, prevPartition); // here, we bypass the partition() call if `onNewBatch` throw exception try { partitioner.onNewBatch(record.topic(), cluster, prevPartition); // 2nd time we call partitioner. If onNewBatch doesn't get implemented, we won't call it partition = partition(record, serializedKey, serializedValue, cluster); } catch (UnsupportedOperationException e) { // ignore the exception since the partitioner doesn't care about new batch case } log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); ... {code} What do you think? > Uneven distribution with RoundRobinPartitioner in AK 2.4+ > --------------------------------------------------------- > > Key: KAFKA-9965 > URL: https://issues.apache.org/jira/browse/KAFKA-9965 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Reporter: Michael Bingham > Priority: Major > > {{RoundRobinPartitioner}} states that it will provide equal distribution of > records across partitions. However with the enhancements made in KIP-480, it > may not. In some cases, when a new batch is started, the partitioner may be > called a second time for the same record: > [https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L909] > [https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L934] > Each time the partitioner is called, it increments a counter in > {{RoundRobinPartitioner}}, so this can result in unequal distribution. > Easiest fix might be to decrement the counter in > {{RoundRobinPartitioner#onNewBatch}}. > -- This message was sent by Atlassian Jira (v8.3.4#803005)