[
https://issues.apache.org/jira/browse/KAFKA-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15662168#comment-15662168
]
Jun Yao commented on KAFKA-4402:
--------------------------------
Hi, Ewen,
I updated the description, I am looking at the same producer code as you are.
The issue is more that the counter is not per topic.
I added a unit test in my pr to validate this, without the fix the result will
not be balanced.
https://github.com/apache/kafka/pull/2128/files#diff-f30df3b3b79e9be0de6c94dcce90a56e
meanwhile, I also run a local test from producer side to validate this:
bin/kafka-topics.sh --create --topic mtest0 --zookeeper localhost:2181
--partitions 3 --replication-factor 1
bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181
--partitions 1 --replication-factor 1
public class KafkaProducerPartitionTest {
private Producer<String, String> producer;
public static void main(String[] args) {
KafkaProducerPartitionTest kafkaProducerPartitionTest = new
KafkaProducerPartitionTest();
try {
kafkaProducerPartitionTest.run();
} catch (Exception e) {
e.printStackTrace();
}
}
public void run() throws InterruptedException {
initProducer();
Map<Integer, Integer> partitionCount = new HashMap<>();
String loopTopic = "mtest0";
CountDownLatch latch = new CountDownLatch(360);
Callback c = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if(loopTopic.equalsIgnoreCase(metadata.topic())){
partitionCount.put(metadata.partition(),
partitionCount.getOrDefault(metadata.partition(), 0) + 1);
}
latch.countDown();
}
};
for(int i = 0; i < 300; ++i){
producer.send(new ProducerRecord<String, String>(loopTopic, "" +
i), c);
if(i%5 == 0 ){
producer.send(new ProducerRecord<String, String>("test", "a"),
c);
}
}
latch.await();
System.out.println("partitionCount=" + partitionCount);
}
public void initProducer() {
try {
Properties props = new Properties();
props.load(getClass().getClassLoader().getResourceAsStream("kafka-config.properties"));
producer = new KafkaProducer<>(props);
} catch (IOException e) {
e.printStackTrace();
}
}
Without the fix, it will print
partitionCount={0=60, 1=120, 2=120}
after the fix, it will print
partitionCount={0=100, 1=100, 2=100}
> Kafka Producer's DefaultPartitioner is actually not round robin as said in
> the code comments "If no partition or key is present choose a partition in a
> round-robin fashion"
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-4402
> URL: https://issues.apache.org/jira/browse/KAFKA-4402
> Project: Kafka
> Issue Type: Improvement
> Reporter: Jun Yao
> Priority: Minor
>
> From this code comments, it is said that Kafka client Producer's
> DefaultPartitioner will do round robin if "no partition or key is present",
> https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L34
> from the code it looks trying to do round robin as well, as it maintained a
> counter and try to increase it every time and then will decide which
> partition to go to;
> However the issue here is the counter is a global counter that is shared by
> all the topics, so it is actually not round robin per topic and sometimes
> caused unbalanced routing among different partitions.
> Although we can pass a custom implementation of interface
> "org.apache.kafka.clients.producer.Partitioner", it might be still good to
> make the default implementation true round robin as comment.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)