This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 5632b4b [improve] change RoutingMode default from UseSinglePartition
to RoundRobinDistribution (#507)
5632b4b is described below
commit 5632b4b9107eed0911981d46054f445cbf80ebf3
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Oct 13 11:08:20 2025 +0800
[improve] change RoutingMode default from UseSinglePartition to
RoundRobinDistribution (#507)
---
include/pulsar/ProducerConfiguration.h | 2 +-
lib/ProducerConfigurationImpl.h | 2 +-
tests/BasicEndToEndTest.cc | 4 +++-
tests/ProducerConfigurationTest.cc | 6 +++---
tests/ReaderTest.cc | 21 ++++++++++++++++-----
5 files changed, 24 insertions(+), 11 deletions(-)
diff --git a/include/pulsar/ProducerConfiguration.h
b/include/pulsar/ProducerConfiguration.h
index 1724fff..9e2a06d 100644
--- a/include/pulsar/ProducerConfiguration.h
+++ b/include/pulsar/ProducerConfiguration.h
@@ -237,7 +237,7 @@ class PULSAR_PUBLIC ProducerConfiguration {
/**
* Set the message routing modes for partitioned topics.
*
- * Default: UseSinglePartition
+ * Default: RoundRobinDistribution
*
* @param PartitionsRoutingMode partition routing mode.
* @return
diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h
index c635c48..c324020 100644
--- a/lib/ProducerConfigurationImpl.h
+++ b/lib/ProducerConfigurationImpl.h
@@ -34,7 +34,7 @@ struct ProducerConfigurationImpl {
CompressionType compressionType{CompressionNone};
int maxPendingMessages{1000};
int maxPendingMessagesAcrossPartitions{50000};
- ProducerConfiguration::PartitionsRoutingMode
routingMode{ProducerConfiguration::UseSinglePartition};
+ ProducerConfiguration::PartitionsRoutingMode
routingMode{ProducerConfiguration::RoundRobinDistribution};
MessageRoutingPolicyPtr messageRouter;
ProducerConfiguration::HashingScheme
hashingScheme{ProducerConfiguration::BoostHash};
bool useLazyStartPartitionedProducers{false};
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index c269538..4330609 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -1697,9 +1697,11 @@ TEST(BasicEndToEndTest, testSeekOnPartitionedTopic) {
std::string subName = "sub-testSeekOnPartitionedTopic";
Producer producer;
+ ProducerConfiguration conf;
+ conf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
Promise<Result, Producer> producerPromise;
- client.createProducerAsync(topicName,
WaitForCallbackValue<Producer>(producerPromise));
+ client.createProducerAsync(topicName, conf,
WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);
diff --git a/tests/ProducerConfigurationTest.cc
b/tests/ProducerConfigurationTest.cc
index df5867c..b6abea3 100644
--- a/tests/ProducerConfigurationTest.cc
+++ b/tests/ProducerConfigurationTest.cc
@@ -33,7 +33,7 @@ TEST(ProducerConfigurationTest, testDefaultConfig) {
ASSERT_EQ(conf.getCompressionType(), CompressionType::CompressionNone);
ASSERT_EQ(conf.getMaxPendingMessages(), 1000);
ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 50000);
- ASSERT_EQ(conf.getPartitionsRoutingMode(),
ProducerConfiguration::UseSinglePartition);
+ ASSERT_EQ(conf.getPartitionsRoutingMode(),
ProducerConfiguration::RoundRobinDistribution);
ASSERT_EQ(conf.getMessageRouterPtr(), MessageRoutingPolicyPtr{});
ASSERT_EQ(conf.getHashingScheme(), ProducerConfiguration::BoostHash);
ASSERT_EQ(conf.getBlockIfQueueFull(), false);
@@ -88,8 +88,8 @@ TEST(ProducerConfigurationTest, testCustomConfig) {
conf.setMaxPendingMessagesAcrossPartitions(100000);
ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 100000);
-
conf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
- ASSERT_EQ(conf.getPartitionsRoutingMode(),
ProducerConfiguration::RoundRobinDistribution);
+ conf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+ ASSERT_EQ(conf.getPartitionsRoutingMode(),
ProducerConfiguration::UseSinglePartition);
const auto router = std::make_shared<MockMessageRoutingPolicy>();
conf.setMessageRouter(router);
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index 0371bac..3da25e9 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -67,7 +67,9 @@ TEST_P(ReaderTest, testSimpleReader) {
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(),
readerConf, reader));
Producer producer;
- ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+ ProducerConfiguration producerConf;
+
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf,
producer));
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
@@ -142,7 +144,9 @@ TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) {
initTopic(topicName);
Producer producer;
- ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+ ProducerConfiguration producerConf;
+
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf,
producer));
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
@@ -176,7 +180,9 @@ TEST_P(ReaderTest, testMultipleReaders) {
initTopic(topicName);
Producer producer;
- ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+ ProducerConfiguration producerConf;
+
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf,
producer));
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
@@ -223,7 +229,9 @@ TEST_P(ReaderTest, testReaderOnLastMessage) {
initTopic(topicName);
Producer producer;
- ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+ ProducerConfiguration producerConf;
+
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf,
producer));
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
@@ -263,7 +271,9 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) {
initTopic(topicName);
Producer producer;
- ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+ ProducerConfiguration producerConf;
+
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf,
producer));
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
@@ -459,6 +469,7 @@ TEST_P(ReaderTest,
testReaderReachEndOfTopicMessageWithoutBatches) {
Producer producer;
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(false);
+
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf,
producer));
// 2. create reader, and expect hasMessageAvailable return false since no
message produced.