[ 
https://issues.apache.org/jira/browse/KAFKA-9965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17390950#comment-17390950
 ] 

Luke Chen commented on KAFKA-9965:
----------------------------------

[~cmccabe] [~ijuma] , I have a proposal for this issue. Currently in WIP PR: 
[https://github.com/apache/kafka/pull/8690,] we tried to fix the 
`RoundRobinPartitioner` by decrement the counter in `onNewBatch` callback. 
However, this fix can't fix all 3rd party partitioners if they don't implement 
the `onNewBatch` method.

 

The root cause should be that we added a new method `onNewBatch` in Kafka 2.4, 
and some old 3rd-party partitioners don't implement it, which cause we call 
partitioner twice. These partitioners don't implement `onNewBatch` method means 
partitioners won't have different partition on new batch, ex: 
RoundRobinPartitioner. We should not have another call to partitioner in these 
cases. I'm thinking we can throw exception for default onNewBatch method like 
this:
{code:java}
default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    // orignally, the default implementation is empty, now, we'll throw 
exception here.
    throw new UnsupportedOperationException();
}{code}
 

This way, we don't change the API, and also we can know the partitioner doesn't 
implement the `onNewBatch` method, and we can directly use the previously 
paritioner's partition in producer, ex:

 

 
{code:java}
// KafkaProducer#doSend
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback 
callback) {
  ...
  // first time we call partitioner
  int partition = partition(record, serializedKey, serializedValue, cluster);
  ...
  log.trace("Attempting to append record {} with callback {} to topic {} 
partition {}", record, callback, record.topic(), partition);
  // try to append the record, and abort when new batch
  RecordAccumulator.RecordAppendResult result = accumulator.append(tp, 
timestamp, serializedKey,
        serializedValue, headers, interceptCallback, remainingWaitMs, true, 
nowMs);

  // new batch case
  if (result.abortForNewBatch) {
    int prevPartition = partition;
    // partitioner.onNewBatch(record.topic(), cluster, prevPartition);
    // here, we bypass the partition() call if `onNewBatch` throw exception
    try {
       partitioner.onNewBatch(record.topic(), cluster, prevPartition);
       // 2nd time we call partitioner. If onNewBatch doesn't get implemented, 
we won't call it
       partition = partition(record, serializedKey, serializedValue, cluster);
    } catch (UnsupportedOperationException e) {
       // ignore the exception since the partitioner doesn't care about new 
batch case
    }
    
    log.trace("Retrying append due to new batch creation for topic {} partition 
{}. The old partition was {}", record.topic(), partition, prevPartition);
   ...


{code}
 

What do you think?

 

> Uneven distribution with RoundRobinPartitioner in AK 2.4+
> ---------------------------------------------------------
>
>                 Key: KAFKA-9965
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9965
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 2.4.0, 2.5.0, 2.4.1
>            Reporter: Michael Bingham
>            Priority: Major
>
> {{RoundRobinPartitioner}} states that it will provide equal distribution of 
> records across partitions. However with the enhancements made in KIP-480, it 
> may not. In some cases, when a new batch is started, the partitioner may be 
> called a second time for the same record:
> [https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L909]
> [https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L934]
> Each time the partitioner is called, it increments a counter in 
> {{RoundRobinPartitioner}}, so this can result in unequal distribution.
> Easiest fix might be to decrement the counter in 
> {{RoundRobinPartitioner#onNewBatch}}.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to