This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e59d0e164d [fix][client] Make auto partitions update work for old 
brokers without PIP-344 (#24822)
5e59d0e164d is described below

commit 5e59d0e164d5c255021302394d9ad70583fa565a
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Oct 8 17:56:40 2025 +0800

    [fix][client] Make auto partitions update work for old brokers without 
PIP-344 (#24822)
---
 pip/pip-344.md                                     |  7 ++-
 .../apache/pulsar/client/impl/ClientCnxTest.java   |  2 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  2 +-
 .../backwardscompatibility/ClientTest25.java       | 54 ++++++++++++++++++++++
 4 files changed, 61 insertions(+), 4 deletions(-)

diff --git a/pip/pip-344.md b/pip/pip-344.md
index 5eafc6fd5c2..120a8e1ef0d 100644
--- a/pip/pip-344.md
+++ b/pip/pip-344.md
@@ -122,6 +122,9 @@ message FeatureFlags {
 
 # Backward & Forward Compatibility
 
-- Old version client and New version Broker: The client will call the old API.
+Old version (`< 3.0.6`) client and New version (`>= 3.0.6`) Broker: The client 
will call the old API.
 
-- New version client and Old version Broker: The feature flag 
`supports_binary_api_get_partitioned_meta_with_param_created_false` will be 
`false`. The client will get a not-support error if the param 
`createIfAutoCreationEnabled` is false.
+New version client and Old version Broker: The feature flag 
`supports_binary_api_get_partitioned_meta_with_param_created_false` will be 
`false`. The client will get a not-support error if the param 
`createIfAutoCreationEnabled` is false in the following cases:
+- The topic is a DLQ topic
+- The topic is non-persistent
+- The topic is in geo-replication that the local cluster is new version and 
the remote cluster is old version
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 3b30f0011e5..6e9bc4595a0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -260,7 +260,7 @@ public class ClientCnxTest extends 
MockedPulsarServiceBaseTest {
             field.set(clientCnxFuture.get(), false);
         }
         try {
-            clientWitBinaryLookup.getPartitionsForTopic(topic, false).join();
+            clientWitBinaryLookup.getPartitionedTopicMetadata(topic, false, 
false).join();
             Assert.fail("Expected an error that the broker version is too 
old.");
         } catch (Exception ex) {
             Assert.assertTrue(ex.getMessage().contains("without auto-creation 
is not supported by the broker"));
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 6ffdfa55a9b..5f5239131a8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1291,7 +1291,7 @@ public class PulsarClientImpl implements PulsarClient {
 
     @Override
     public CompletableFuture<List<String>> getPartitionsForTopic(String topic, 
boolean metadataAutoCreationEnabled) {
-        return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, 
false).thenApply(metadata -> {
+        return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, 
true).thenApply(metadata -> {
             if (metadata.partitions > 0) {
                 TopicName topicName = TopicName.get(topic);
                 List<String> partitions = new ArrayList<>(metadata.partitions);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java
index 2e153b4ec7f..9fcbf667659 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java
@@ -19,8 +19,19 @@
 package org.apache.pulsar.tests.integration.backwardscompatibility;
 
 
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.tests.integration.topologies.ClientTestBase;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class ClientTest25 extends PulsarStandaloneTestSuite25 {
@@ -34,4 +45,47 @@ public class ClientTest25 extends 
PulsarStandaloneTestSuite25 {
         clientTestBase.resetCursorCompatibility(serviceUrl.get(), 
httpServiceUrl.get(), topicName);
     }
 
+    @Test(timeOut = 20000)
+    public void testAutoPartitionsUpdate() throws Exception {
+        @Cleanup final var pulsarClient = PulsarClient.builder()
+                .serviceUrl(getContainer().getPlainTextServiceUrl())
+                .build();
+        final var topic = "test-auto-part-update";
+        final var topic2 = "dummy-topic";
+        @Cleanup final var admin = 
PulsarAdmin.builder().serviceHttpUrl(getContainer().getHttpServiceUrl()).build();
+        // Use 2 as the initial partition number because old version broker 
cannot update partitions on a topic that
+        // has only 1 partition.
+        admin.topics().createPartitionedTopic(topic, 2);
+        admin.topics().createPartitionedTopic(topic2, 2);
+        @Cleanup final var producer = 
pulsarClient.newProducer().autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+                .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                .messageRouter(new MessageRouter() {
+                    @Override
+                    public int choosePartition(Message<?> msg, TopicMetadata 
metadata) {
+                        return metadata.numPartitions() - 1;
+                    }
+                })
+                .topic(topic)
+                .create();
+        @Cleanup final var consumer = 
pulsarClient.newConsumer().autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+                .topic(topic).subscriptionName("sub")
+                .subscribe();
+        @Cleanup final var multiTopicsConsumer = 
pulsarClient.newConsumer().autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+                .topics(List.of(topic, 
topic2)).subscriptionName("sub-2").subscribe();
+
+        admin.topics().updatePartitionedTopic(topic, 3);
+        Thread.sleep(1500);
+        final var msgId = (MessageIdAdv) producer.send("msg".getBytes());
+        Assert.assertEquals(msgId.getPartitionIndex(), 2);
+
+        final var msg = consumer.receive(3, TimeUnit.SECONDS);
+        Assert.assertNotNull(msg);
+        Assert.assertEquals(((MessageIdAdv) 
msg.getMessageId()).getPartitionIndex(), 2);
+        final var msg2 = multiTopicsConsumer.receive(3, TimeUnit.SECONDS);
+        Assert.assertNotNull(msg2);
+        Assert.assertEquals(msg2.getMessageId(), msg.getMessageId());
+    }
 }

Reply via email to