This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 998bd90fba5 [fix][broker] Can't connecte to non-persist topic when enable broker client tls (#22991) 998bd90fba5 is described below commit 998bd90fba5bbc46286e486f601f34875ee8e528 Author: Baodi Shi <ba...@apache.org> AuthorDate: Thu Jul 4 07:02:26 2024 +0800 [fix][broker] Can't connecte to non-persist topic when enable broker client tls (#22991) (cherry picked from commit deb26f7662268def7f838f722de4a677b3d546ed) --- .../pulsar/broker/namespace/NamespaceService.java | 10 ++++- .../api/TokenExpirationProduceConsumerTest.java | 45 +++++++++++++++++----- 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 7bf05857095..056679bc89c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1421,7 +1421,15 @@ public class NamespaceService implements AutoCloseable { return FutureUtil.failedFuture(new ServiceUnitNotReadyException( "No broker was available to own " + topicName)); } - return pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl()) + LookupData lookupData = lookupResult.get().getLookupData(); + String brokerUrl; + if (pulsar.getConfiguration().isBrokerClientTlsEnabled() + && StringUtils.isNotEmpty(lookupData.getBrokerUrlTls())) { + brokerUrl = lookupData.getBrokerUrlTls(); + } else { + brokerUrl = lookupData.getBrokerUrl(); + } + return pulsarClient.getLookup(brokerUrl) .getPartitionedTopicMetadata(topicName, false) .thenApply(metadata -> true) .exceptionallyCompose(ex -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java index 4fc0d315d22..eb3056307d3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java @@ -18,11 +18,23 @@ */ package org.apache.pulsar.client.api; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; import io.jsonwebtoken.Jwts; import io.jsonwebtoken.SignatureAlgorithm; +import java.time.Duration; +import java.util.Base64; +import java.util.Calendar; +import java.util.Date; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.crypto.SecretKey; +import java.nio.charset.StandardCharsets; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; @@ -40,15 +52,6 @@ import org.mockito.internal.util.MockUtil; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import javax.crypto.SecretKey; -import java.time.Duration; -import java.util.Base64; -import java.util.Calendar; -import java.util.Date; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; @Test(groups = "broker-api") @Slf4j @@ -114,6 +117,7 @@ public class TokenExpirationProduceConsumerTest extends TlsProducerConsumerBase conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName())); conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN); + conf.setBrokerClientTlsEnabled(true); conf.getProperties().setProperty("tokenSecretKey", "data:;base64," + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded())); } @@ -139,6 +143,29 @@ public class TokenExpirationProduceConsumerTest extends TlsProducerConsumerBase return clientBuilder.build(); } + @Test + public void testNonPersistentTopic() throws Exception { + + @Cleanup + PulsarClient pulsarClient = getClient(ADMIN_TOKEN); + + String topic = "non-persistent://" + namespaceName + "/test-token-non-persistent"; + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test").subscribe(); + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + byte[] msg = "Hello".getBytes(StandardCharsets.UTF_8); + producer.send(msg); + + Message<byte[]> receive = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals(receive.getData(), msg); + } + @Test public void testTokenExpirationProduceConsumer() throws Exception { Calendar calendar = Calendar.getInstance();