This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 708fefff345 [fix][client] Make auto partitions update work for old
brokers without PIP-344 (#24822)
708fefff345 is described below
commit 708fefff345118d5e5857ba5bc52596a297fa905
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Oct 8 17:56:40 2025 +0800
[fix][client] Make auto partitions update work for old brokers without
PIP-344 (#24822)
(cherry picked from commit 5e59d0e164d5c255021302394d9ad70583fa565a)
---
.../apache/pulsar/client/impl/ClientCnxTest.java | 2 +-
.../pulsar/client/impl/PulsarClientImpl.java | 2 +-
.../backwardscompatibility/ClientTest2_2.java | 54 ++++++++++++++++++++++
3 files changed, 56 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index bdff97dbb3b..4f4f0a750da 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -156,7 +156,7 @@ public class ClientCnxTest extends
MockedPulsarServiceBaseTest {
field.set(clientCnxFuture.get(), false);
}
try {
- clientWitBinaryLookup.getPartitionsForTopic(topic, false).join();
+ clientWitBinaryLookup.getPartitionedTopicMetadata(topic, false,
false).join();
Assert.fail("Expected an error that the broker version is too
old.");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains("without auto-creation
is not supported by the broker"));
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 1362e744301..6c749aa9131 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1216,7 +1216,7 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public CompletableFuture<List<String>> getPartitionsForTopic(String topic,
boolean metadataAutoCreationEnabled) {
- return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled,
false).thenApply(metadata -> {
+ return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled,
true).thenApply(metadata -> {
if (metadata.partitions > 0) {
TopicName topicName = TopicName.get(topic);
List<String> partitions = new ArrayList<>(metadata.partitions);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_2.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_2.java
index 212ac48b638..e4e32fa89b1 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_2.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_2.java
@@ -19,8 +19,19 @@
package org.apache.pulsar.tests.integration.backwardscompatibility;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.tests.integration.topologies.ClientTestBase;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class ClientTest2_2 extends PulsarStandaloneTestSuite2_2 {
@@ -33,4 +44,47 @@ public class ClientTest2_2 extends
PulsarStandaloneTestSuite2_2 {
clientTestBase.resetCursorCompatibility(serviceUrl.get(),
httpServiceUrl.get(), topicName);
}
+ @Test(timeOut = 20000)
+ public void testAutoPartitionsUpdate() throws Exception {
+ @Cleanup final var pulsarClient = PulsarClient.builder()
+ .serviceUrl(getContainer().getPlainTextServiceUrl())
+ .build();
+ final var topic = "test-auto-part-update";
+ final var topic2 = "dummy-topic";
+ @Cleanup final var admin =
PulsarAdmin.builder().serviceHttpUrl(getContainer().getHttpServiceUrl()).build();
+ // Use 2 as the initial partition number because old version broker
cannot update partitions on a topic that
+ // has only 1 partition.
+ admin.topics().createPartitionedTopic(topic, 2);
+ admin.topics().createPartitionedTopic(topic2, 2);
+ @Cleanup final var producer =
pulsarClient.newProducer().autoUpdatePartitions(true)
+ .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+ .messageRoutingMode(MessageRoutingMode.CustomPartition)
+ .messageRouter(new MessageRouter() {
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata
metadata) {
+ return metadata.numPartitions() - 1;
+ }
+ })
+ .topic(topic)
+ .create();
+ @Cleanup final var consumer =
pulsarClient.newConsumer().autoUpdatePartitions(true)
+ .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+ .topic(topic).subscriptionName("sub")
+ .subscribe();
+ @Cleanup final var multiTopicsConsumer =
pulsarClient.newConsumer().autoUpdatePartitions(true)
+ .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+ .topics(List.of(topic,
topic2)).subscriptionName("sub-2").subscribe();
+
+ admin.topics().updatePartitionedTopic(topic, 3);
+ Thread.sleep(1500);
+ final var msgId = (MessageIdAdv) producer.send("msg".getBytes());
+ Assert.assertEquals(msgId.getPartitionIndex(), 2);
+
+ final var msg = consumer.receive(3, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg);
+ Assert.assertEquals(((MessageIdAdv)
msg.getMessageId()).getPartitionIndex(), 2);
+ final var msg2 = multiTopicsConsumer.receive(3, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg2);
+ Assert.assertEquals(msg2.getMessageId(), msg.getMessageId());
+ }
}