This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 998bd90fba5 [fix][broker] Can't connecte to non-persist topic when 
enable broker client tls (#22991)
998bd90fba5 is described below

commit 998bd90fba5bbc46286e486f601f34875ee8e528
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Thu Jul 4 07:02:26 2024 +0800

    [fix][broker] Can't connecte to non-persist topic when enable broker client 
tls (#22991)
    
    (cherry picked from commit deb26f7662268def7f838f722de4a677b3d546ed)
---
 .../pulsar/broker/namespace/NamespaceService.java  | 10 ++++-
 .../api/TokenExpirationProduceConsumerTest.java    | 45 +++++++++++++++++-----
 2 files changed, 45 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 7bf05857095..056679bc89c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1421,7 +1421,15 @@ public class NamespaceService implements AutoCloseable {
                         return FutureUtil.failedFuture(new 
ServiceUnitNotReadyException(
                                 "No broker was available to own " + 
topicName));
                     }
-                    return 
pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl())
+                    LookupData lookupData = lookupResult.get().getLookupData();
+                    String brokerUrl;
+                    if (pulsar.getConfiguration().isBrokerClientTlsEnabled()
+                            && 
StringUtils.isNotEmpty(lookupData.getBrokerUrlTls())) {
+                        brokerUrl = lookupData.getBrokerUrlTls();
+                    } else {
+                        brokerUrl = lookupData.getBrokerUrl();
+                    }
+                    return pulsarClient.getLookup(brokerUrl)
                         .getPartitionedTopicMetadata(topicName, false)
                         .thenApply(metadata -> true)
                         .exceptionallyCompose(ex -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
index 4fc0d315d22..eb3056307d3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
@@ -18,11 +18,23 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
 import io.jsonwebtoken.Jwts;
 import io.jsonwebtoken.SignatureAlgorithm;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import java.nio.charset.StandardCharsets;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -40,15 +52,6 @@ import org.mockito.internal.util.MockUtil;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import javax.crypto.SecretKey;
-import java.time.Duration;
-import java.util.Base64;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 @Test(groups = "broker-api")
 @Slf4j
@@ -114,6 +117,7 @@ public class TokenExpirationProduceConsumerTest extends 
TlsProducerConsumerBase
         
conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
         
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
         conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
+        conf.setBrokerClientTlsEnabled(true);
         conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
                 + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
     }
@@ -139,6 +143,29 @@ public class TokenExpirationProduceConsumerTest extends 
TlsProducerConsumerBase
         return clientBuilder.build();
     }
 
+    @Test
+    public void testNonPersistentTopic() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = getClient(ADMIN_TOKEN);
+
+        String topic = "non-persistent://" + namespaceName + 
"/test-token-non-persistent";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("test").subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        byte[] msg = "Hello".getBytes(StandardCharsets.UTF_8);
+        producer.send(msg);
+
+        Message<byte[]> receive = consumer.receive(3, TimeUnit.SECONDS);
+        assertNotNull(receive);
+        assertEquals(receive.getData(), msg);
+    }
+
     @Test
     public void testTokenExpirationProduceConsumer() throws Exception {
         Calendar calendar = Calendar.getInstance();

Reply via email to