This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b5fdeff5444b2f7ca2da0dea229a1d1583df9d9d
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Thu Jun 6 16:09:38 2024 +0800

    [improve] [client] PIP-344 support feature flag 
supportsGetPartitionedMetadataWithoutAutoCreation (#22773)
    
    (cherry picked from commit 6236116754472c61b2166da6d4797fc63c83f364)
---
 .../apache/pulsar/client/impl/ClientCnxTest.java   | 44 ++++++++++++++++++++++
 .../client/impl/BinaryProtoLookupService.java      |  6 +++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  5 +++
 .../apache/pulsar/common/protocol/Commands.java    |  1 +
 4 files changed, 56 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index dfd52d494ae..df6b1b8a8f9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -20,13 +20,17 @@ package org.apache.pulsar.client.impl;
 
 import com.google.common.collect.Sets;
 import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -124,4 +128,44 @@ public class ClientCnxTest extends 
MockedPulsarServiceBaseTest {
         producer.close();
         consumer.close();
     }
+
+    @Test
+    public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws 
Exception {
+        final String topic = BrokerTestUtil.newUniqueName( "persistent://" + 
NAMESPACE + "/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+        PulsarClientImpl clientWitBinaryLookup = (PulsarClientImpl) 
PulsarClient.builder()
+                .maxNumberOfRejectedRequestPerConnection(1)
+                .connectionMaxIdleSeconds(Integer.MAX_VALUE)
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .build();
+        ProducerImpl producer = (ProducerImpl) 
clientWitBinaryLookup.newProducer().topic(topic).create();
+
+        // Verify: the variable 
"isSupportsGetPartitionedMetadataWithoutAutoCreation" responded from the broker 
is true.
+        Awaitility.await().untilAsserted(() -> {
+            ClientCnx clientCnx = producer.getClientCnx();
+            Assert.assertNotNull(clientCnx);
+            
Assert.assertTrue(clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation());
+        });
+        Assert.assertEquals(
+                clientWitBinaryLookup.getPartitionsForTopic(topic, 
true).get().size(), 1);
+
+        // Inject a "false" value for the variable 
"isSupportsGetPartitionedMetadataWithoutAutoCreation".
+        // Verify: client will get a not support error.
+        Field field = 
ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
+        field.setAccessible(true);
+        for (CompletableFuture<ClientCnx> clientCnxFuture : 
clientWitBinaryLookup.getCnxPool().getConnections()) {
+            field.set(clientCnxFuture.get(), false);
+        }
+        try {
+            clientWitBinaryLookup.getPartitionsForTopic(topic, false).join();
+            Assert.fail("Expected an error that the broker version is too 
old.");
+        } catch (Exception ex) {
+            Assert.assertTrue(ex.getMessage().contains("without auto-creation 
is not supported from the broker"));
+        }
+
+        // cleanup.
+        producer.close();
+        clientWitBinaryLookup.close();
+        admin.topics().delete(topic, false);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index b363d6e4366..bf015c564b9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -254,6 +254,12 @@ public class BinaryProtoLookupService implements 
LookupService {
         CompletableFuture<PartitionedTopicMetadata> partitionFuture = new 
CompletableFuture<>();
 
         client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx 
-> {
+            if (!metadataAutoCreationEnabled && 
!clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
+                partitionFuture.completeExceptionally(new 
PulsarClientException.NotSupportedException("The feature of"
+                        + " getting partitions without auto-creation is not 
supported from the broker,"
+                        + " please upgrade the broker to the latest 
version."));
+                return;
+            }
             long requestId = client.newRequestId();
             ByteBuf request = 
Commands.newPartitionMetadataRequest(topicName.toString(), requestId,
                     metadataAutoCreationEnabled);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 03e0f406dd2..6f343a2ee58 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -191,6 +191,8 @@ public class ClientCnx extends PulsarHandler {
     protected AuthenticationDataProvider authenticationDataProvider;
     private TransactionBufferHandler transactionBufferHandler;
     private boolean supportsTopicWatchers;
+    @Getter
+    private boolean supportsGetPartitionedMetadataWithoutAutoCreation;
 
     /** Idle stat. **/
     @Getter
@@ -400,6 +402,9 @@ public class ClientCnx extends PulsarHandler {
 
         supportsTopicWatchers =
             connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTopicWatchers();
+        supportsGetPartitionedMetadataWithoutAutoCreation =
+            connected.hasFeatureFlags()
+                    && 
connected.getFeatureFlags().isSupportsGetPartitionedMetadataWithoutAutoCreation();
 
         // set remote protocol version to the correct version before we 
complete the connection future
         setRemoteEndpointProtocolVersion(connected.getProtocolVersion());
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 53907e61914..4822dba023d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -301,6 +301,7 @@ public class Commands {
         connected.setProtocolVersion(versionToAdvertise);
 
         
connected.setFeatureFlags().setSupportsTopicWatchers(supportsTopicWatchers);
+        
connected.setFeatureFlags().setSupportsGetPartitionedMetadataWithoutAutoCreation(true);
         return cmd;
     }
 

Reply via email to