This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5a1fa0c5c67 [fix][broker] Fix issue of field 'topic' is not set when 
handle GetSchema request (#22377)
5a1fa0c5c67 is described below

commit 5a1fa0c5c6709bdb7b003ce12d00abf52da293e1
Author: Cong Zhao <zhaoc...@apache.org>
AuthorDate: Thu Mar 28 23:14:19 2024 +0800

    [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema 
request (#22377)
    
    (cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  5 ++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 33 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 30432aaef73..2b2cbc5fac2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2389,9 +2389,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             schemaVersion = 
schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
         }
 
+        final String topic = commandGetSchema.getTopic();
         String schemaName;
         try {
-            schemaName = 
TopicName.get(commandGetSchema.getTopic()).getSchemaName();
+            schemaName = TopicName.get(topic).getSchemaName();
         } catch (Throwable t) {
             commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.InvalidTopicName, t.getMessage());
             return;
@@ -2400,7 +2401,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         schemaService.getSchema(schemaName, 
schemaVersion).thenAccept(schemaAndMetadata -> {
             if (schemaAndMetadata == null) {
                 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.TopicNotFound,
-                        String.format("Topic not found or no-schema %s", 
commandGetSchema.getTopic()));
+                        String.format("Topic not found or no-schema %s", 
topic));
             } else {
                 commandSender.sendGetSchemaResponse(requestId,
                         SchemaInfoUtil.newSchemaInfo(schemaName, 
schemaAndMetadata.schema), schemaAndMetadata.version);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 7eae6462545..e10b45868bf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import lombok.EqualsAndHashCode;
@@ -69,6 +70,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
     @BeforeMethod
     @Override
     public void setup() throws Exception {
+        isTcpLookup = true;
         super.internalSetup();
 
         // Setup namespaces
@@ -106,6 +110,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest 
{
                 .allowedClusters(Collections.singleton(CLUSTER_NAME))
                 .build();
         admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+        admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns");
     }
 
     @AfterMethod(alwaysRun = true)
@@ -130,6 +135,34 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         
pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
     }
 
+    @Test
+    public void testGetSchemaWithPatternTopic() throws Exception {
+        final String topicPrefix = "persistent://public/my-ns/test-getSchema";
+
+        int topicNums = 10;
+        for (int i = 0; i < topicNums; i++) {
+            String topic = topicPrefix + "-" + i;
+            admin.topics().createNonPartitionedTopic(topic);
+        }
+
+        Pattern pattern = Pattern.compile(topicPrefix + "-.*");
+        @Cleanup
+        Consumer<GenericRecord> consumer = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topicsPattern(pattern)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        List<ConsumerImpl<GenericRecord>> consumers =
+                ((MultiTopicsConsumerImpl<GenericRecord>) 
consumer).getConsumers();
+        Assert.assertEquals(topicNums, consumers.size());
+
+        for (int i = 0; i < topicNums; i++) {
+            String topic = topicPrefix + "-" + i;
+            admin.topics().delete(topic, true);
+        }
+    }
+
     @Test
     public void testMultiTopicSetSchemaProvider() throws Exception {
         final String tenant = PUBLIC_TENANT;

Reply via email to