This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new deb26f76622 [fix][broker] Can't connecte to non-persist topic when 
enable broker client tls (#22991)
deb26f76622 is described below

commit deb26f7662268def7f838f722de4a677b3d546ed
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Thu Jul 4 07:02:26 2024 +0800

    [fix][broker] Can't connecte to non-persist topic when enable broker client 
tls (#22991)
---
 .../pulsar/broker/namespace/NamespaceService.java  | 10 +++++++-
 .../api/TokenExpirationProduceConsumerTest.java    | 27 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index dfd03dfbc6e..2a1584df961 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1471,7 +1471,15 @@ public class NamespaceService implements AutoCloseable {
                         return FutureUtil.failedFuture(new 
ServiceUnitNotReadyException(
                                 "No broker was available to own " + 
topicName));
                     }
-                    return 
pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl())
+                    LookupData lookupData = lookupResult.get().getLookupData();
+                    String brokerUrl;
+                    if (pulsar.getConfiguration().isBrokerClientTlsEnabled()
+                            && 
StringUtils.isNotEmpty(lookupData.getBrokerUrlTls())) {
+                        brokerUrl = lookupData.getBrokerUrlTls();
+                    } else {
+                        brokerUrl = lookupData.getBrokerUrl();
+                    }
+                    return pulsarClient.getLookup(brokerUrl)
                         .getPartitionedTopicMetadata(topicName, false)
                         .thenApply(metadata -> true)
                         .exceptionallyCompose(ex -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
index fa9099f3d2f..d8ed1055720 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
@@ -32,6 +34,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.crypto.SecretKey;
+import java.nio.charset.StandardCharsets;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -107,6 +110,7 @@ public class TokenExpirationProduceConsumerTest extends 
TlsProducerConsumerBase
         
conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
         
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
         conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
+        conf.setBrokerClientTlsEnabled(true);
         conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
                 + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
     }
@@ -132,6 +136,29 @@ public class TokenExpirationProduceConsumerTest extends 
TlsProducerConsumerBase
         return clientBuilder.build();
     }
 
+    @Test
+    public void testNonPersistentTopic() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = getClient(ADMIN_TOKEN);
+
+        String topic = "non-persistent://" + namespaceName + 
"/test-token-non-persistent";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("test").subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        byte[] msg = "Hello".getBytes(StandardCharsets.UTF_8);
+        producer.send(msg);
+
+        Message<byte[]> receive = consumer.receive(3, TimeUnit.SECONDS);
+        assertNotNull(receive);
+        assertEquals(receive.getData(), msg);
+    }
+
     @Test
     public void testTokenExpirationProduceConsumer() throws Exception {
         Calendar calendar = Calendar.getInstance();

Reply via email to