codelipenghui commented on code in PR #22206:
URL: https://github.com/apache/pulsar/pull/22206#discussion_r1606411321


##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -300,6 +300,7 @@ message FeatureFlags {
   optional bool supports_broker_entry_metadata = 2 [default = false];
   optional bool supports_partial_producer = 3 [default = false];
   optional bool supports_topic_watchers = 4 [default = false];
+  optional bool supports_get_partitioned_meta_v2 = 5 [default = false];

Review Comment:
   ```suggestion
     optional bool supports_get_partitioned_metadata_without_auto_creation = 5 
[default = false];
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java:
##########
@@ -59,12 +59,19 @@ public interface LookupService extends AutoCloseable {
     CompletableFuture<LookupTopicResult> getBroker(TopicName topicName);
 
     /**
-     * Returns {@link PartitionedTopicMetadata} for a given topic.
-     *
-     * @param topicName topic-name
-     * @return
+     * 1.Get the partitions if the topic exists. Return "{partition: n}" if a 
partitioned topic exists;
+     *  return "{partition: 0}" if a non-partitioned topic exists.
+     * 2. When {@param metadataAutoCreationEnabled} is "false," neither 
partitioned topic nor non-partitioned topic
+     *   does not exist. You will get an {@link 
PulsarClientException.NotFoundException}.
+     *  2-1. You will get a {@link 
PulsarClientException.NotSupportedException} if the broker's version is an older
+     *   one that does not support this feature and the Pulsar client is using 
a binary protocol "serviceUrl".
+     * 3.When {@param metadataAutoCreationEnabled} is "true," it will trigger 
an auto-creation for this topic(using
+     *  the default topic auto-creation strategy you set for the broker), and 
the corresponding result is returned.
+     *  For the result, see case 1.
+     * @version 3.3.0.
      */
-    CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName);

Review Comment:
   Is it better to keep this method for compatibility? Instead, we can provide 
a default implementation to always use `metadataAutoCreationEnabled=true`. And 
add @Deprecated annotation



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.List;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+@Slf4j
+public class GetPartitionMetadataTest extends ProducerConsumerBase {
+
+    private static final String DEFAULT_NS = "public/default";
+
+    private PulsarClientImpl clientWithHttpLookup;
+    private PulsarClientImpl clientWitBinaryLookup;
+
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+        clientWithHttpLookup =
+                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+        clientWitBinaryLookup =
+                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        if (clientWithHttpLookup != null) {
+            clientWithHttpLookup.close();
+        }
+        if (clientWitBinaryLookup != null) {
+            clientWitBinaryLookup.close();
+        }
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+    }
+
+    private LookupService getLookupService(boolean isUsingHttpLookup) {
+        if (isUsingHttpLookup) {
+            return clientWithHttpLookup.getLookup();
+        } else {
+            return clientWitBinaryLookup.getLookup();
+        }
+    }
+
+    @Test
+    public void testAutoCreatingMetadataWhenCallingOldAPI() throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreation(true);
+        setup();
+
+        // HTTP client.
+        final String tp1 = BrokerTestUtil.newUniqueName("persistent://" + 
DEFAULT_NS + "/tp");
+        clientWithHttpLookup.getPartitionsForTopic(tp1).join();
+        Optional<PartitionedTopicMetadata> metadata1 = 
pulsar.getPulsarResources().getNamespaceResources()
+                .getPartitionedTopicResources()
+                .getPartitionedTopicMetadataAsync(TopicName.get(tp1), 
true).join();
+        assertTrue(metadata1.isPresent());
+        assertEquals(metadata1.get().partitions, 3);
+
+        // Binary client.
+        final String tp2 = BrokerTestUtil.newUniqueName("persistent://" + 
DEFAULT_NS + "/tp");
+        clientWitBinaryLookup.getPartitionsForTopic(tp2).join();
+        Optional<PartitionedTopicMetadata> metadata2 = 
pulsar.getPulsarResources().getNamespaceResources()
+                .getPartitionedTopicResources()
+                .getPartitionedTopicMetadataAsync(TopicName.get(tp2), 
true).join();
+        assertTrue(metadata2.isPresent());
+        assertEquals(metadata2.get().partitions, 3);
+
+        // Cleanup.
+        admin.topics().deletePartitionedTopic(tp1, false);
+        admin.topics().deletePartitionedTopic(tp2, false);
+    }
+
+    @DataProvider(name = "autoCreationParamsAll")
+    public Object[][] autoCreationParamsAll(){
+        return new Object[][]{
+            // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, 
isUsingHttpLookup.
+            {true, true, true},
+            {true, true, false},
+            {true, false, true},
+            {true, false, false},
+            {false, true, true},
+            {false, true, false},
+            {false, false, true},
+            {false, false, false}
+        };
+    }
+
+    @Test(dataProvider = "autoCreationParamsAll")
+    public void testGetMetadataIfNonPartitionedTopicExists(boolean 
configAllowAutoTopicCreation,
+                                                           boolean 
paramMetadataAutoCreationEnabled,
+                                                            boolean 
isUsingHttpLookup) throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
+        setup();
+        LookupService lookup = getLookupService(isUsingHttpLookup);
+        // Create topic.
+        final String topicNameStr = 
BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp");
+        final TopicName topicName = TopicName.get(topicNameStr);
+        admin.topics().createNonPartitionedTopic(topicNameStr);
+        // Verify.
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        PartitionedTopicMetadata response =
+                lookup.getPartitionedTopicMetadata(topicName, 
paramMetadataAutoCreationEnabled).join();
+        assertEquals(response.partitions, 0);
+        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
+        assertFalse(partitionedTopics.contains(topicNameStr));
+        List<String> topicList = admin.topics().getList("public/default");
+        for (int i = 0; i < 3; i++) {
+            assertFalse(topicList.contains(topicName.getPartition(i)));
+        }
+        // Cleanup.
+        client.close();
+        admin.topics().delete(topicNameStr, false);
+    }
+
+    @Test(dataProvider = "autoCreationParamsAll")
+    public void testGetMetadataIfPartitionedTopicExists(boolean 
configAllowAutoTopicCreation,
+                                                        boolean 
paramMetadataAutoCreationEnabled,
+                                                        boolean 
isUsingHttpLookup) throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
+        setup();
+        LookupService lookup = getLookupService(isUsingHttpLookup);
+        // Create topic.
+        final String topicNameStr = 
BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp");
+        final TopicName topicName = TopicName.get(topicNameStr);
+        admin.topics().createPartitionedTopic(topicNameStr, 3);
+        // Verify.
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        PartitionedTopicMetadata response =
+                lookup.getPartitionedTopicMetadata(topicName, 
paramMetadataAutoCreationEnabled).join();
+        assertEquals(response.partitions, 3);
+        List<String> topicList = admin.topics().getList("public/default");
+        assertFalse(topicList.contains(topicNameStr));
+        // Cleanup.
+        client.close();
+        admin.topics().deletePartitionedTopic(topicNameStr, false);
+    }
+
+    @DataProvider(name = "clients")
+    public Object[][] clients(){
+        return new Object[][]{
+                // isUsingHttpLookup.
+                {true},
+                {false}
+        };
+    }
+
+    @Test(dataProvider = "clients")
+    public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup) 
throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreation(true);
+        setup();
+        LookupService lookup = getLookupService(isUsingHttpLookup);
+        // Create topic.
+        final String topicNameStr = 
BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp");
+        final TopicName topicName = TopicName.get(topicNameStr);
+        // Verify.
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        PartitionedTopicMetadata response = 
lookup.getPartitionedTopicMetadata(topicName, true).join();
+        assertEquals(response.partitions, 3);
+        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
+        assertTrue(partitionedTopics.contains(topicNameStr));
+        List<String> topicList = admin.topics().getList("public/default");
+        assertFalse(topicList.contains(topicNameStr));
+        for (int i = 0; i < 3; i++) {
+            // The API "getPartitionedTopicMetadata" only creates the 
partitioned metadata, it will not create the
+            // partitions.
+            assertFalse(topicList.contains(topicName.getPartition(i)));
+        }
+        // Cleanup.
+        client.close();
+        admin.topics().deletePartitionedTopic(topicNameStr, false);
+    }
+
+    @Test(dataProvider = "clients")
+    public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup) 
throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+        conf.setAllowAutoTopicCreation(true);
+        setup();
+        LookupService lookup = getLookupService(isUsingHttpLookup);
+        // Create topic.
+        final String topicNameStr = 
BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp");
+        final TopicName topicName = TopicName.get(topicNameStr);
+        // Verify.
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        PartitionedTopicMetadata response = 
lookup.getPartitionedTopicMetadata(topicName, true).join();
+        assertEquals(response.partitions, 0);
+        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
+        assertFalse(partitionedTopics.contains(topicNameStr));
+        List<String> topicList = admin.topics().getList("public/default");
+        assertFalse(topicList.contains(topicNameStr));
+        // Cleanup.
+        client.close();
+    }
+
+    @DataProvider(name = "autoCreationParamsNotAllow")
+    public Object[][] autoCreationParamsNotAllow(){
+        return new Object[][]{
+                // configAllowAutoTopicCreation, 
paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
+                {true, false, true},
+                {true, false, false},
+                // {false, true, true},
+                // {false, true, false},

Review Comment:
   Any reason for adding `//` for these 2 line?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -607,6 +609,44 @@ protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
             isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
authenticationData, originalAuthData).thenApply(
                     isAuthorized -> {
                 if (isAuthorized) {
+                    // Get if exists, respond not found error if not exists.
+                    if (!partitionMetadata.isMetadataAutoCreationEnabled()) {
+                        final NamespaceResources namespaceResources = 
getBrokerService().pulsar().getPulsarResources()
+                                .getNamespaceResources();
+                        final TopicResources topicResources = 
getBrokerService().pulsar().getPulsarResources()
+                                .getTopicResources();
+                        return 
namespaceResources.getPartitionedTopicResources()

Review Comment:
   return is not required.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -607,6 +609,44 @@ protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
             isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
authenticationData, originalAuthData).thenApply(
                     isAuthorized -> {
                 if (isAuthorized) {
+                    // Get if exists, respond not found error if not exists.
+                    if (!partitionMetadata.isMetadataAutoCreationEnabled()) {
+                        final NamespaceResources namespaceResources = 
getBrokerService().pulsar().getPulsarResources()
+                                .getNamespaceResources();
+                        final TopicResources topicResources = 
getBrokerService().pulsar().getPulsarResources()
+                                .getTopicResources();
+                        return 
namespaceResources.getPartitionedTopicResources()
+                            .getPartitionedTopicMetadataAsync(topicName, false)
+                            .thenAccept(metadata -> {
+                                if (metadata.isPresent()) {
+                                    
commandSender.sendPartitionMetadataResponse(metadata.get().partitions, 
requestId);
+                                    lookupSemaphore.release();
+                                    return;
+                                }
+                                if (topicName.isPersistent()) {
+                                    
topicResources.persistentTopicExists(topicName).thenAccept(exists -> {
+                                        if (exists) {
+                                            
commandSender.sendPartitionMetadataResponse(0, requestId);
+                                            lookupSemaphore.release();
+                                            return;
+                                        }
+                                        
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TopicNotFound,
+                                                        "", requestId));
+                                        lookupSemaphore.release();
+                                    }).exceptionally(ex -> {
+                                        log.error("{} {} Failed to get 
partition metadata", topicName,
+                                                ctx.channel(), ex);
+                                        writeAndFlush(
+                                                
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
+                                                "Failed to check partition 
metadata",
+                                                requestId));
+                                        lookupSemaphore.release();
+                                        return null;
+                                    });
+                                }
+                            });

Review Comment:
   add .exceptionally() here to cover the exceptions from 
`.getPartitionedTopicMetadataAsync`



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.List;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+@Slf4j
+public class GetPartitionMetadataTest extends ProducerConsumerBase {
+
+    private static final String DEFAULT_NS = "public/default";
+
+    private PulsarClientImpl clientWithHttpLookup;
+    private PulsarClientImpl clientWitBinaryLookup;
+
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+        clientWithHttpLookup =
+                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+        clientWitBinaryLookup =
+                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        if (clientWithHttpLookup != null) {
+            clientWithHttpLookup.close();
+        }
+        if (clientWitBinaryLookup != null) {
+            clientWitBinaryLookup.close();
+        }
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+    }
+
+    private LookupService getLookupService(boolean isUsingHttpLookup) {
+        if (isUsingHttpLookup) {
+            return clientWithHttpLookup.getLookup();
+        } else {
+            return clientWitBinaryLookup.getLookup();
+        }
+    }
+
+    @Test
+    public void testAutoCreatingMetadataWhenCallingOldAPI() throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreation(true);
+        setup();
+
+        // HTTP client.
+        final String tp1 = BrokerTestUtil.newUniqueName("persistent://" + 
DEFAULT_NS + "/tp");
+        clientWithHttpLookup.getPartitionsForTopic(tp1).join();
+        Optional<PartitionedTopicMetadata> metadata1 = 
pulsar.getPulsarResources().getNamespaceResources()
+                .getPartitionedTopicResources()
+                .getPartitionedTopicMetadataAsync(TopicName.get(tp1), 
true).join();
+        assertTrue(metadata1.isPresent());
+        assertEquals(metadata1.get().partitions, 3);
+
+        // Binary client.
+        final String tp2 = BrokerTestUtil.newUniqueName("persistent://" + 
DEFAULT_NS + "/tp");
+        clientWitBinaryLookup.getPartitionsForTopic(tp2).join();
+        Optional<PartitionedTopicMetadata> metadata2 = 
pulsar.getPulsarResources().getNamespaceResources()
+                .getPartitionedTopicResources()
+                .getPartitionedTopicMetadataAsync(TopicName.get(tp2), 
true).join();
+        assertTrue(metadata2.isPresent());
+        assertEquals(metadata2.get().partitions, 3);
+
+        // Cleanup.
+        admin.topics().deletePartitionedTopic(tp1, false);
+        admin.topics().deletePartitionedTopic(tp2, false);
+    }
+
+    @DataProvider(name = "autoCreationParamsAll")
+    public Object[][] autoCreationParamsAll(){
+        return new Object[][]{
+            // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, 
isUsingHttpLookup.
+            {true, true, true},
+            {true, true, false},
+            {true, false, true},
+            {true, false, false},
+            {false, true, true},
+            {false, true, false},
+            {false, false, true},
+            {false, false, false}
+        };
+    }
+
+    @Test(dataProvider = "autoCreationParamsAll")
+    public void testGetMetadataIfNonPartitionedTopicExists(boolean 
configAllowAutoTopicCreation,
+                                                           boolean 
paramMetadataAutoCreationEnabled,
+                                                            boolean 
isUsingHttpLookup) throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
+        setup();
+        LookupService lookup = getLookupService(isUsingHttpLookup);
+        // Create topic.
+        final String topicNameStr = 
BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp");
+        final TopicName topicName = TopicName.get(topicNameStr);
+        admin.topics().createNonPartitionedTopic(topicNameStr);
+        // Verify.
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        PartitionedTopicMetadata response =
+                lookup.getPartitionedTopicMetadata(topicName, 
paramMetadataAutoCreationEnabled).join();
+        assertEquals(response.partitions, 0);
+        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
+        assertFalse(partitionedTopics.contains(topicNameStr));
+        List<String> topicList = admin.topics().getList("public/default");
+        for (int i = 0; i < 3; i++) {
+            assertFalse(topicList.contains(topicName.getPartition(i)));
+        }

Review Comment:
   There is no partitioned topic created. Why do we need to check the 
partitioned topics?



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java:
##########
@@ -308,14 +308,32 @@ static ClientBuilder builder() {
      *
      * <p>This can be used to discover the partitions and create {@link 
Reader}, {@link Consumer} or {@link Producer}
      * instances directly on a particular partition.
-     *
+     * @Deprecated it is not suggested to use now; please use {@link 
#getPartitionsForTopic(String, boolean)}.
      * @param topic
      *            the topic name
      * @return a future that will yield a list of the topic partitions or 
{@link PulsarClientException} if there was any
      *         error in the operation.
+     *
      * @since 2.3.0
      */
-    CompletableFuture<List<String>> getPartitionsForTopic(String topic);
+    @Deprecated
+    default CompletableFuture<List<String>> getPartitionsForTopic(String 
topic) {
+        return getPartitionsForTopic(topic, true);
+    }
+
+    /**
+     * 1. Get the partitions if the topic exists. Return "[{partition-0}, 
{partition-1}....{partition-n}}]" if a
+     *   partitioned topic exists; return "[{topic}]" if a non-partitioned 
topic exists.
+     * 2. When {@param metadataAutoCreationEnabled} is "false", neither the 
partitioned topic nor non-partitioned
+     *   topic does not exist. You will get an {@link 
PulsarClientException.NotFoundException}.
+     *  2-1. You will get a {@link 
PulsarClientException.NotSupportedException} if the broker's version is an older
+     *    one that does not support this feature and the Pulsar client is 
using a binary protocol "serviceUrl".

Review Comment:
   `this feature` will be a confusing term. I think the correct description 
should be
   
   ```
   You will get a {@link PulsarClientException.NotSupportedException} with 
metadataAutoCreationEnabled=false on an old broker version which does not 
support getting partitions without partitioned metadata auto-creation.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to