This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 14f0e55a89f [fix][broker] Can't connecte to non-persist topic when enable broker client tls (#22991) 14f0e55a89f is described below commit 14f0e55a89ff1befddb893e7353271617c834253 Author: Baodi Shi <ba...@apache.org> AuthorDate: Thu Jul 4 07:02:26 2024 +0800 [fix][broker] Can't connecte to non-persist topic when enable broker client tls (#22991) (cherry picked from commit deb26f7662268def7f838f722de4a677b3d546ed) --- .../pulsar/broker/namespace/NamespaceService.java | 10 +++++++- .../api/TokenExpirationProduceConsumerTest.java | 27 ++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index dfd03dfbc6e..2a1584df961 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1471,7 +1471,15 @@ public class NamespaceService implements AutoCloseable { return FutureUtil.failedFuture(new ServiceUnitNotReadyException( "No broker was available to own " + topicName)); } - return pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl()) + LookupData lookupData = lookupResult.get().getLookupData(); + String brokerUrl; + if (pulsar.getConfiguration().isBrokerClientTlsEnabled() + && StringUtils.isNotEmpty(lookupData.getBrokerUrlTls())) { + brokerUrl = lookupData.getBrokerUrlTls(); + } else { + brokerUrl = lookupData.getBrokerUrl(); + } + return pulsarClient.getLookup(brokerUrl) .getPartitionedTopicMetadata(topicName, false) .thenApply(metadata -> true) .exceptionallyCompose(ex -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java index fa9099f3d2f..d8ed1055720 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.api; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; @@ -32,6 +34,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import javax.crypto.SecretKey; +import java.nio.charset.StandardCharsets; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; @@ -107,6 +110,7 @@ public class TokenExpirationProduceConsumerTest extends TlsProducerConsumerBase conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName())); conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN); + conf.setBrokerClientTlsEnabled(true); conf.getProperties().setProperty("tokenSecretKey", "data:;base64," + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded())); } @@ -132,6 +136,29 @@ public class TokenExpirationProduceConsumerTest extends TlsProducerConsumerBase return clientBuilder.build(); } + @Test + public void testNonPersistentTopic() throws Exception { + + @Cleanup + PulsarClient pulsarClient = getClient(ADMIN_TOKEN); + + String topic = "non-persistent://" + namespaceName + "/test-token-non-persistent"; + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test").subscribe(); + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + byte[] msg = "Hello".getBytes(StandardCharsets.UTF_8); + producer.send(msg); + + Message<byte[]> receive = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals(receive.getData(), msg); + } + @Test public void testTokenExpirationProduceConsumer() throws Exception { Calendar calendar = Calendar.getInstance();