This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new f670e9baa30 KAFKA-17632: Fix RoundRobinPartitioner for even partition
counts (#17620)
f670e9baa30 is described below
commit f670e9baa307d1b0abde871de17153a00d1fdd0a
Author: Dániel Urbán <[email protected]>
AuthorDate: Tue Nov 12 15:44:36 2024 +0100
KAFKA-17632: Fix RoundRobinPartitioner for even partition counts (#17620)
RoundRobinPartitioner does not handle the fact that on new batch creation,
the partition method is called twice.
Reviewers: Viktor Somogyi-Vass <[email protected]>, Mickael Maison
<[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 2 +
.../apache/kafka/clients/producer/Partitioner.java | 3 ++
.../clients/producer/RoundRobinPartitioner.java | 17 ++++++++-
.../producer/RoundRobinPartitionerTest.java | 44 +++++++++++++++++++++-
4 files changed, 64 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index d0aa37ec7f0..2e11f6d81f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1074,6 +1074,8 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
if (result.abortForNewBatch) {
int prevPartition = partition;
+ // IMPORTANT NOTE: the following onNewBatch and partition
calls should not interrupted to allow
+ // the custom partitioner to correctly track its state
onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue,
cluster);
if (log.isTraceEnabled()) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
index bcfcb2db646..3db3c3a31eb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
@@ -49,6 +49,9 @@ public interface Partitioner extends Configurable, Closeable {
* <p>
* Notifies the partitioner a new batch is about to be created. When using
the sticky partitioner,
* this method can change the chosen sticky partition for the new batch.
+ * <p>
+ * After onNewBatch, the {@link #partition(String, Object, byte[], Object,
byte[], Cluster)} method is called again
+ * which allows the implementation to "redirect" the message on new batch
creation.
* @param topic The topic name
* @param cluster The current cluster metadata
* @param prevPartition The partition previously selected for the record
that triggered a new batch
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
b/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
index be2bc24a509..1ad55fe8cda 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
@@ -36,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class RoundRobinPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new
ConcurrentHashMap<>();
+ private final ThreadLocal<TopicPartition> previousPartition = new
ThreadLocal<>();
public void configure(Map<String, ?> configs) {}
@@ -51,6 +53,14 @@ public class RoundRobinPartitioner implements Partitioner {
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object
value, byte[] valueBytes, Cluster cluster) {
+ TopicPartition prevPartition = previousPartition.get();
+ if (prevPartition != null) {
+ previousPartition.remove();
+ if (topic.equals(prevPartition.topic())) {
+ return prevPartition.partition();
+ }
+ }
+
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions =
cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
@@ -68,6 +78,11 @@ public class RoundRobinPartitioner implements Partitioner {
return counter.getAndIncrement();
}
- public void close() {}
+ @SuppressWarnings("deprecation")
+ @Override
+ public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
+ previousPartition.set(new TopicPartition(topic, prevPartition));
+ }
+ public void close() {}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
index 37f35b0a5a3..33af9af1643 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
@@ -97,6 +97,7 @@ public class RoundRobinPartitionerTest {
assertEquals(10, partitionCount.get(2).intValue());
}
+ @SuppressWarnings("deprecation")
@Test
public void testRoundRobinWithNullKeyBytes() {
final String topicA = "topicA";
@@ -113,6 +114,10 @@ public class RoundRobinPartitionerTest {
Partitioner partitioner = new RoundRobinPartitioner();
for (int i = 0; i < 30; ++i) {
int partition = partitioner.partition(topicA, null, null, null,
null, testCluster);
+ // Simulate single-message batches
+ partitioner.onNewBatch(topicA, testCluster, partition);
+ int nextPartition = partitioner.partition(topicA, null, null,
null, null, testCluster);
+ assertEquals(partition, nextPartition, "New batch creation should
not affect the partition selection");
Integer count = partitionCount.get(partition);
if (null == count)
count = 0;
@@ -126,5 +131,42 @@ public class RoundRobinPartitionerTest {
assertEquals(10, partitionCount.get(0).intValue());
assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue());
- }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testRoundRobinWithNullKeyBytesAndEvenPartitionCount() {
+ final String topicA = "topicA";
+ final String topicB = "topicB";
+
+ List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA,
0, NODES[0], NODES, NODES),
+ new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new
PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
+ new PartitionInfo(topicB, 0, NODES[0], NODES, NODES), new
PartitionInfo(topicA, 3, NODES[0], NODES, NODES));
+ Cluster testCluster = new Cluster("clusterId", asList(NODES[0],
NODES[1], NODES[2]), allPartitions,
+ Collections.emptySet(), Collections.emptySet());
+
+ final Map<Integer, Integer> partitionCount = new HashMap<>();
+
+ Partitioner partitioner = new RoundRobinPartitioner();
+ for (int i = 0; i < 40; ++i) {
+ int partition = partitioner.partition(topicA, null, null, null,
null, testCluster);
+ // Simulate single-message batches
+ partitioner.onNewBatch(topicA, testCluster, partition);
+ int nextPartition = partitioner.partition(topicA, null, null,
null, null, testCluster);
+ assertEquals(partition, nextPartition, "New batch creation should
not affect the partition selection");
+ Integer count = partitionCount.get(partition);
+ if (null == count)
+ count = 0;
+ partitionCount.put(partition, count + 1);
+
+ if (i % 5 == 0) {
+ partitioner.partition(topicB, null, null, null, null,
testCluster);
+ }
+ }
+
+ assertEquals(10, partitionCount.get(0).intValue());
+ assertEquals(10, partitionCount.get(1).intValue());
+ assertEquals(10, partitionCount.get(2).intValue());
+ assertEquals(10, partitionCount.get(3).intValue());
+ }
}