This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 941cda992037f74a5c2e26b2463840ac101126c8 Author: Lari Hotari <[email protected]> AuthorDate: Tue Jul 22 19:55:59 2025 +0300 [fix][misc] Fix topics pattern consumer backwards compatibility (#24537) (cherry picked from commit c358e7135e0b115dfb7aee5617ed1c90b6996e78) --- conf/broker.conf | 10 ++ conf/standalone.conf | 10 ++ .../apache/pulsar/broker/ServiceConfiguration.java | 15 +++ .../apache/pulsar/broker/service/ServerCnx.java | 21 ++-- .../pulsar/broker/service/TopicListService.java | 39 ++++-- .../broker/service/TopicListServiceTest.java | 16 +-- .../broker/service/TopicListWatcherTest.java | 7 +- .../impl/PatternTopicsConsumerImplAuthTest.java | 2 +- .../client/impl/PatternTopicsConsumerImplTest.java | 28 ++--- .../common/naming/ServiceConfigurationTest.java | 19 +++ .../impl/PatternMultiTopicsConsumerImpl.java | 23 ++-- .../pulsar/client/impl/PulsarClientImpl.java | 5 +- .../pulsar/client/impl/TopicListWatcher.java | 10 +- .../impl/PatternMultiTopicsConsumerImplTest.java | 9 +- .../pulsar/client/impl/TopicListWatcherTest.java | 7 +- .../pulsar/common/lookup/GetTopicsResult.java | 4 +- .../pulsar/common/topics/JDKTopicsPattern.java | 46 +++++++ .../common/topics/MatchAllTopicsPattern.java | 38 ++++++ .../pulsar/common/topics/RE2JTopicsPattern.java | 41 +++++++ .../org/apache/pulsar/common/topics/TopicList.java | 24 ++-- .../apache/pulsar/common/topics/TopicsPattern.java | 84 +++++++++++++ .../pulsar/common/topics/TopicsPatternFactory.java | 103 ++++++++++++++++ .../apache/pulsar/common/topics/TopicListTest.java | 9 +- .../common/topics/TopicsPatternFactoryTest.java | 136 +++++++++++++++++++++ .../pulsar/common/topics/TopicsPatternTest.java | 74 +++++++++++ 25 files changed, 688 insertions(+), 92 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 5d4aeab64ad..2a7befafed3 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -730,6 +730,16 @@ enableBrokerSideSubscriptionPatternEvaluation=true # Longer patterns are rejected to avoid patterns that are crafted to overload the broker. subscriptionPatternMaxLength=50 +# The regular expression implementation to use for topic pattern matching. +# RE2J_WITH_JDK_FALLBACK is the default. It uses the RE2J implementation and falls back to +# the JDK implementation for backwards compatibility reasons when the pattern compilation fails +# with the RE2/j library. +# RE2J is more performant but does not support all regex features (e.g. negative lookaheads). +# JDK uses the standard Java regex implementation which supports all features but can be slower. +# Bad or malicious regex patterns requiring extensive backtracing could cause high resource usage +# with RE2J_WITH_JDK_FALLBACK or JDK implementations. +topicsPatternRegexImplementation=RE2J_WITH_JDK_FALLBACK + ### --- Authentication --- ### # Role names that are treated as "proxy roles". If the broker sees a request with #role as proxyRoles - it will demand to see a valid original principal. diff --git a/conf/standalone.conf b/conf/standalone.conf index 1aaef7ed36e..22d769c7db0 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -386,6 +386,16 @@ enableBrokerSideSubscriptionPatternEvaluation=true # Longer patterns are rejected to avoid patterns that are crafted to overload the broker. subscriptionPatternMaxLength=50 +# The regular expression implementation to use for topic pattern matching. +# RE2J_WITH_JDK_FALLBACK is the default. It uses the RE2J implementation and falls back to +# the JDK implementation for backwards compatibility reasons when the pattern compilation fails +# with the RE2/j library. +# RE2J is more performant but does not support all regex features (e.g. negative lookaheads). +# JDK uses the standard Java regex implementation which supports all features but can be slower. +# Bad or malicious regex patterns requiring extensive backtracing could cause high resource usage +# with RE2J_WITH_JDK_FALLBACK or JDK implementations. +topicsPatternRegexImplementation=RE2J_WITH_JDK_FALLBACK + ### --- Metadata Store --- ### # Whether we should enable metadata operations batching diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index df102a2f2aa..c8d65cb46af 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -51,6 +51,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.sasl.SaslConstants; +import org.apache.pulsar.common.topics.TopicsPattern; import org.apache.pulsar.common.util.DefaultPulsarSslFactory; import org.apache.pulsar.common.util.DirectMemoryUtils; import org.apache.pulsar.metadata.api.MetadataStoreFactory; @@ -1569,6 +1570,20 @@ public class ServiceConfiguration implements PulsarConfiguration { + "# NOTE: This flag will be removed in some major releases in the future.\n") private boolean strictTopicNameEnabled = false; + @FieldContext( + category = CATEGORY_SERVER, + doc = "The regular expression implementation to use for topic pattern matching. \n" + + "RE2J_WITH_JDK_FALLBACK is the default. It uses the RE2J implementation and falls back to " + + "the JDK implementation for backwards compatibility reasons when the pattern compilation fails " + + "with the RE2/j library.\n" + + "RE2J is more performant but does not support all regex features (e.g. negative lookaheads). \n" + + "JDK uses the standard Java regex implementation which supports all features but can be slower.\n" + + "Bad or malicious regex patterns requiring extensive backtracing could cause high resource usage " + + "with RE2J_WITH_JDK_FALLBACK or JDK implementations." + ) + private TopicsPattern.RegexImplementation topicsPatternRegexImplementation = + TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK; + @FieldContext( category = CATEGORY_SCHEMA, doc = "The schema compatibility strategy to use for system topics" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index b0774513057..f68c851af3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -32,7 +32,6 @@ import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import com.google.re2j.Pattern; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -175,6 +174,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.topics.TopicsPattern; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.StringInterner; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; @@ -252,6 +252,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final long resumeThresholdPendingBytesPerThread; private final long connectionLivenessCheckTimeoutMillis; + private final TopicsPattern.RegexImplementation topicsPatternImplementation; // Tracks and limits number of bytes pending to be published from a single specific IO thread. static final class PendingBytesPerThreadTracker { @@ -354,6 +355,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { enableSubscriptionPatternEvaluation, maxSubscriptionPatternLength); this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null; this.throttleTracker = new ServerCnxThrottleTracker(this); + topicsPatternImplementation = conf.getTopicsPatternRegexImplementation(); } @Override @@ -2521,7 +2523,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) { if (topicsPattern.get().length() <= maxSubscriptionPatternLength) { filterTopics = true; - filteredTopics = TopicList.filterTopics(filteredTopics, topicsPattern.get()); + filteredTopics = TopicList.filterTopics(filteredTopics, topicsPattern.get(), + topicsPatternImplementation); } else { log.info("[{}] Subscription pattern provided [{}] was longer than maximum {}.", remoteAddress, topicsPattern.get(), maxSubscriptionPatternLength); @@ -3165,18 +3168,16 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { final long watcherId = commandWatchTopicList.getWatcherId(); final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace()); - Pattern topicsPattern = Pattern.compile(commandWatchTopicList.hasTopicsPattern() - ? TopicList.removeTopicDomainScheme(commandWatchTopicList.getTopicsPattern()) - : TopicList.ALL_TOPICS_PATTERN); - String topicsHash = commandWatchTopicList.hasTopicsHash() - ? commandWatchTopicList.getTopicsHash() : null; - final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> { if (isAuthorized) { - topicListService.handleWatchTopicList(namespaceName, watcherId, requestId, topicsPattern, - topicsHash, lookupSemaphore); + String topicsPatternString = commandWatchTopicList.hasTopicsPattern() + ? commandWatchTopicList.getTopicsPattern() : TopicList.ALL_TOPICS_PATTERN; + String topicsHash = commandWatchTopicList.hasTopicsHash() + ? commandWatchTopicList.getTopicsHash() : null; + topicListService.handleWatchTopicList(namespaceName, watcherId, requestId, topicsPatternString, + topicsPatternImplementation, topicsHash, lookupSemaphore); } else { final String msg = "Proxy Client is not authorized to watchTopicList"; log.warn("[{}] {} with role {} on namespace {}", remoteAddress, msg, getPrincipal(), namespaceName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java index 818188fd182..ef2ea284cf7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.service; -import com.google.re2j.Pattern; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -33,6 +32,8 @@ import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.topics.TopicsPattern; +import org.apache.pulsar.common.topics.TopicsPatternFactory; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.metadata.api.NotificationType; import org.slf4j.Logger; @@ -48,13 +49,13 @@ public class TopicListService { private final TopicListService topicListService; private final long id; /** The regexp for the topic name(not contains partition suffix). **/ - private final Pattern topicsPattern; + private final TopicsPattern topicsPattern; /*** * @param topicsPattern The regexp for the topic name(not contains partition suffix). */ public TopicListWatcher(TopicListService topicListService, long id, - Pattern topicsPattern, List<String> topics) { + TopicsPattern topicsPattern, List<String> topics) { this.topicListService = topicListService; this.id = id; this.topicsPattern = topicsPattern; @@ -71,7 +72,9 @@ public class TopicListService { @Override public void accept(String topicName, NotificationType notificationType) { String partitionedTopicName = TopicName.get(topicName).getPartitionedTopicName(); - if (topicsPattern.matcher(TopicList.removeTopicDomainScheme(partitionedTopicName)).matches()) { + String domainLessTopicName = TopicList.removeTopicDomainScheme(partitionedTopicName); + + if (topicsPattern.matches(domainLessTopicName)) { List<String> newTopics; List<String> deletedTopics; if (notificationType == NotificationType.Deleted) { @@ -120,12 +123,17 @@ public class TopicListService { } /*** - * @param topicsPattern The regexp for the topic name(not contains partition suffix). + * @param topicsPatternString The regexp for the topic name */ - public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, long requestId, Pattern topicsPattern, - String topicsHash, Semaphore lookupSemaphore) { + public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, long requestId, + String topicsPatternString, + TopicsPattern.RegexImplementation topicsPatternRegexImplementation, + String topicsHash, + Semaphore lookupSemaphore) { + // remove the domain scheme from the topic pattern + topicsPatternString = TopicList.removeTopicDomainScheme(topicsPatternString); - if (!enableSubscriptionPatternEvaluation || topicsPattern.pattern().length() > maxSubscriptionPatternLength) { + if (!enableSubscriptionPatternEvaluation || topicsPatternString.length() > maxSubscriptionPatternLength) { String msg = "Unable to create topic list watcher: "; if (!enableSubscriptionPatternEvaluation) { msg += "Evaluating subscription patterns is disabled."; @@ -137,6 +145,19 @@ public class TopicListService { lookupSemaphore.release(); return; } + + TopicsPattern topicsPattern; + try { + topicsPattern = TopicsPatternFactory.create(topicsPatternString, topicsPatternRegexImplementation); + } catch (Exception e) { + log.warn("[{}] Unable to create topic list watcher: Invalid pattern: {} on namespace {}", + connection.toString(), topicsPatternString, namespaceName); + connection.getCommandSender().sendErrorResponse(requestId, ServerError.InvalidTopicName, + "Invalid topics pattern: " + e.getMessage()); + lookupSemaphore.release(); + return; + } + CompletableFuture<TopicListWatcher> watcherFuture = new CompletableFuture<>(); CompletableFuture<TopicListWatcher> existingWatcherFuture = watchers.putIfAbsent(watcherId, watcherFuture); @@ -201,7 +222,7 @@ public class TopicListService { * @param topicsPattern The regexp for the topic name(not contains partition suffix). */ public void initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture, - NamespaceName namespace, long watcherId, Pattern topicsPattern) { + NamespaceName namespace, long watcherId, TopicsPattern topicsPattern) { namespaceService.getListOfPersistentTopics(namespace). thenApply(topics -> { TopicListWatcher watcher = new TopicListWatcher(this, watcherId, topicsPattern, topics); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java index 95643c5ae45..9109828c025 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import com.google.re2j.Pattern; import java.net.InetSocketAddress; import java.util.Collections; import java.util.List; @@ -39,6 +38,7 @@ import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.topics.TopicsPattern; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -50,6 +50,8 @@ public class TopicListServiceTest { private CompletableFuture<List<String>> topicListFuture; private Semaphore lookupSemaphore; private TopicResources topicResources; + private final TopicsPattern.RegexImplementation topicsPatternImplementation = + TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK; @BeforeMethod(alwaysRun = true) public void setup() throws Exception { @@ -80,8 +82,8 @@ public class TopicListServiceTest { NamespaceName.get("tenant/ns"), 13, 7, - Pattern.compile("persistent://tenant/ns/topic\\d"), - null, + "persistent://tenant/ns/topic\\d", + topicsPatternImplementation, null, lookupSemaphore); List<String> topics = Collections.singletonList("persistent://tenant/ns/topic1"); String hash = TopicList.calculateHash(topics); @@ -98,8 +100,8 @@ public class TopicListServiceTest { NamespaceName.get("tenant/ns"), 13, 7, - Pattern.compile("persistent://tenant/ns/topic\\d"), - null, + "persistent://tenant/ns/topic\\d", + topicsPatternImplementation, null, lookupSemaphore); topicListFuture.completeExceptionally(new PulsarServerException("Error")); Assert.assertEquals(1, lookupSemaphore.availablePermits()); @@ -114,8 +116,8 @@ public class TopicListServiceTest { NamespaceName.get("tenant/ns"), 13, 7, - Pattern.compile("persistent://tenant/ns/topic\\d"), - null, + "persistent://tenant/ns/topic\\d", + topicsPatternImplementation, null, lookupSemaphore); List<String> topics = Collections.singletonList("persistent://tenant/ns/topic1"); topicListFuture.complete(topics); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java index 086051d3043..884cdc0ef92 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java @@ -21,11 +21,12 @@ package org.apache.pulsar.broker.service; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import com.google.re2j.Pattern; import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.topics.TopicsPattern; +import org.apache.pulsar.common.topics.TopicsPatternFactory; import org.apache.pulsar.metadata.api.NotificationType; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -40,7 +41,8 @@ public class TopicListWatcherTest { ); private static final long ID = 7; - private static final Pattern PATTERN = Pattern.compile("tenant/ns/topic\\d+"); + private static final TopicsPattern PATTERN = + TopicsPatternFactory.create("tenant/ns/topic\\d+", TopicsPattern.RegexImplementation.RE2J); private TopicListService topicListService; @@ -98,5 +100,4 @@ public class TopicListWatcherTest { Arrays.asList("persistent://tenant/ns/topic1", "persistent://tenant/ns/topic2"), watcher.getMatchingTopics()); } - } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java index 061a6baa974..e441de3ea1a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java @@ -204,7 +204,7 @@ public class PatternTopicsConsumerImplAuthTest extends ProducerConsumerBase { assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX)); // 4. verify consumer - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions(); List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 90031ff92ea..c1b96a058f4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -227,7 +227,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { }); // 4. verify consumer get methods, to get right number of partitions and topics. - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions(); List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers(); @@ -311,7 +311,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { }); // 4. verify consumer get methods, to get right number of partitions and topics. - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions(); List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers(); @@ -394,7 +394,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { }); // 4. verify consumer get methods, to get right number of partitions and topics. - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions(); List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers(); @@ -492,7 +492,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { }); // 4. verify consumer get methods, to get right number of partitions and topics. - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions(); List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers(); @@ -569,7 +569,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { }); // 3. verify consumer get methods, to get 5 number of partitions and topics. - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 5); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 5); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2); @@ -593,7 +593,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { // 5. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3. Awaitility.await().untilAsserted(() -> { - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2); @@ -665,7 +665,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { // 2. verify consumer get methods. There is no need to trigger discovery, because the broker will push the // changes to update(CommandWatchTopicUpdate). - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); Awaitility.await().untilAsserted(() -> { assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 4); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 4); @@ -704,7 +704,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { admin.topics().createPartitionedTopic(topicName, 4); // 2. verify broker will push the changes to update(CommandWatchTopicUpdate). - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 4); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 4); @@ -761,7 +761,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2).create(); // 3. verify will update the partitions and consumers - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 8); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 8); @@ -825,7 +825,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { } // 2. verify consumer can subscribe the topic. - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); Awaitility.await().untilAsserted(() -> { assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 1); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 1); @@ -883,7 +883,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { // 2. verify consumer can subscribe the topic. // Since the minimum value of `patternAutoDiscoveryPeriod` is 60s, we set the test timeout to a triple value. - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); Awaitility.await().atMost(Duration.ofMinutes(3)).untilAsserted(() -> { assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 1); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 1); @@ -980,7 +980,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl); // 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3 - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2); @@ -1095,7 +1095,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl); // 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3 - assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().inputPattern()); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2); @@ -1188,7 +1188,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer; // 4. verify consumer get methods - assertSame(consumerImpl.getPattern().pattern(), pattern.pattern()); + assertSame(consumerImpl.getPattern().inputPattern(), pattern.pattern()); assertEquals(consumerImpl.getPartitionedTopics().size(), 0); producer1.send("msg-1"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 9bde2aecd0f..1802bd6f59c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -44,6 +44,7 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.topics.TopicsPattern; import org.testng.annotations.Test; @Test(groups = "broker-naming") @@ -413,4 +414,22 @@ public class ServiceConfigurationTest { ServiceConfiguration.class); assertEquals(conf.lookupProperties(), Map.of("lookup.key2", "value2")); } + + @Test + public void testTopicsPatternRegexImplementationOptions() throws IOException { + ServiceConfiguration conf = loadConfString("topicsPatternRegexImplementation=RE2J_WITH_JDK_FALLBACK"); + assertEquals(conf.getTopicsPatternRegexImplementation(), + TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK); + conf = loadConfString("topicsPatternRegexImplementation=JDK"); + assertEquals(conf.getTopicsPatternRegexImplementation(), + TopicsPattern.RegexImplementation.JDK); + conf = loadConfString("topicsPatternRegexImplementation=RE2J"); + assertEquals(conf.getTopicsPatternRegexImplementation(), + TopicsPattern.RegexImplementation.RE2J); + } + + private static ServiceConfiguration loadConfString(String confString) throws IOException { + return PulsarConfigurationLoader.create(new ByteArrayInputStream(confString.getBytes(StandardCharsets.UTF_8)), + ServiceConfiguration.class); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 5003ed9673a..a31b857d251 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -18,10 +18,8 @@ */ package org.apache.pulsar.client.impl; -import static com.google.common.base.Preconditions.checkArgument; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import com.google.re2j.Pattern; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.util.Timeout; import io.netty.util.TimerTask; @@ -44,12 +42,13 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.topics.TopicsPattern; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T> implements TimerTask { - private final Pattern topicsPattern; + private final TopicsPattern topicsPattern; final TopicsChangedListener topicsChangeListener; private final Mode subscriptionMode; private final CompletableFuture<TopicListWatcher> watcherFuture = new CompletableFuture<>(); @@ -66,7 +65,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T /*** * @param topicsPattern The regexp for the topic name(not contains partition suffix). */ - public PatternMultiTopicsConsumerImpl(Pattern topicsPattern, + public PatternMultiTopicsConsumerImpl(TopicsPattern topicsPattern, String topicsHash, PulsarClientImpl client, ConsumerConfigurationData<T> conf, @@ -80,11 +79,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T this.topicsPattern = topicsPattern; this.topicsHash = topicsHash; this.subscriptionMode = subscriptionMode; - - if (this.namespaceName == null) { - this.namespaceName = getNameSpaceFromPattern(topicsPattern); - } - checkArgument(getNameSpaceFromPattern(topicsPattern).toString().equals(this.namespaceName.toString())); + this.namespaceName = topicsPattern.namespace(); this.topicsChangeListener = new PatternTopicsChangedListener(); this.updateTaskQueue = new PatternConsumerUpdateQueue(this); @@ -109,10 +104,6 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T } } - public static NamespaceName getNameSpaceFromPattern(Pattern pattern) { - return TopicName.get(pattern.pattern()).getNamespaceObject(); - } - /** * This method will be called after the {@link TopicListWatcher} reconnected after enabled {@link TopicListWatcher}. */ @@ -135,7 +126,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T } CompletableFuture<Void> recheckTopicsChange() { - String pattern = topicsPattern.pattern(); + String pattern = topicsPattern.inputPattern(); final int epoch = recheckPatternEpoch.incrementAndGet(); return client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode, pattern, topicsHash) .thenCompose(getTopicsResult -> { @@ -168,7 +159,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T }); } - static CompletableFuture<Void> updateSubscriptions(Pattern topicsPattern, + static CompletableFuture<Void> updateSubscriptions(TopicsPattern topicsPattern, java.util.function.Consumer<String> topicsHashSetter, GetTopicsResult getTopicsResult, TopicsChangedListener topicsChangedListener, @@ -198,7 +189,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T return FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback)); } - public Pattern getPattern() { + public TopicsPattern getPattern() { return this.topicsPattern; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index f5722838d52..950a01afb46 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.re2j.Pattern; import io.netty.channel.EventLoopGroup; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; @@ -91,6 +90,8 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.topics.TopicsPattern; +import org.apache.pulsar.common.topics.TopicsPatternFactory; import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; @@ -638,7 +639,7 @@ public class PulsarClientImpl implements PulsarClient { Mode subscriptionMode = convertRegexSubscriptionMode(conf.getRegexSubscriptionMode()); TopicName destination = TopicName.get(regex); NamespaceName namespaceName = destination.getNamespaceObject(); - Pattern pattern = Pattern.compile(conf.getTopicsPattern().pattern()); + TopicsPattern pattern = TopicsPatternFactory.create(conf.getTopicsPattern()); CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>(); lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode, regex, null) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java index 5357b154622..5ea1e22cc30 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.client.impl; -import com.google.re2j.Pattern; import io.netty.channel.ChannelHandlerContext; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -30,6 +29,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.topics.TopicsPattern; import org.apache.pulsar.common.util.BackoffBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +45,7 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler. private final PatternConsumerUpdateQueue patternConsumerUpdateQueue; private final String name; private final ConnectionHandler connectionHandler; - private final Pattern topicsPattern; + private final TopicsPattern topicsPattern; private final long watcherId; private volatile long createWatcherDeadline = 0; private final NamespaceName namespace; @@ -63,11 +63,11 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler. * @param topicsPattern The regexp for the topic name(not contains partition suffix). */ public TopicListWatcher(PatternConsumerUpdateQueue patternConsumerUpdateQueue, - PulsarClientImpl client, Pattern topicsPattern, long watcherId, + PulsarClientImpl client, TopicsPattern topicsPattern, long watcherId, NamespaceName namespace, String topicsHash, CompletableFuture<TopicListWatcher> watcherFuture, Runnable recheckTopicsChangeAfterReconnect) { - super(client, topicsPattern.pattern()); + super(client, topicsPattern.topicLookupNameForTopicListWatcherPlacement()); this.patternConsumerUpdateQueue = patternConsumerUpdateQueue; this.name = "Watcher(" + topicsPattern + ")"; this.connectionHandler = new ConnectionHandler(this, @@ -131,7 +131,7 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler. synchronized (this) { setClientCnx(cnx); BaseCommand watchRequest = Commands.newWatchTopicList(requestId, watcherId, namespace.toString(), - topicsPattern.pattern(), topicsHash); + topicsPattern.inputPattern(), topicsHash); cnx.newWatchTopicList(watchRequest, requestId) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java index 0407a49da52..4bb541fbd5a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java @@ -24,12 +24,13 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.Sets; -import com.google.re2j.Pattern; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import java.util.regex.Pattern; import org.apache.pulsar.common.lookup.GetTopicsResult; +import org.apache.pulsar.common.topics.TopicsPatternFactory; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -52,7 +53,7 @@ public class PatternMultiTopicsConsumerImplTest { @Test public void testChangedUnfilteredResponse() { PatternMultiTopicsConsumerImpl.updateSubscriptions( - Pattern.compile("tenant/my-ns/name-.*"), + TopicsPatternFactory.create(Pattern.compile("tenant/my-ns/name-.*")), mockTopicsHashSetter, new GetTopicsResult(Arrays.asList( "persistent://tenant/my-ns/name-1", @@ -71,7 +72,7 @@ public class PatternMultiTopicsConsumerImplTest { @Test public void testChangedFilteredResponse() { PatternMultiTopicsConsumerImpl.updateSubscriptions( - Pattern.compile("tenant/my-ns/name-.*"), + TopicsPatternFactory.create(Pattern.compile("tenant/my-ns/name-.*")), mockTopicsHashSetter, new GetTopicsResult(Arrays.asList( "persistent://tenant/my-ns/name-0", @@ -90,7 +91,7 @@ public class PatternMultiTopicsConsumerImplTest { @Test public void testUnchangedResponse() { PatternMultiTopicsConsumerImpl.updateSubscriptions( - Pattern.compile("tenant/my-ns/name-.*"), + TopicsPatternFactory.create(Pattern.compile("tenant/my-ns/name-.*")), mockTopicsHashSetter, new GetTopicsResult(Arrays.asList( "persistent://tenant/my-ns/name-0", diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java index 803305911ce..601045d17c1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java @@ -27,12 +27,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertTrue; -import com.google.re2j.Pattern; import io.netty.channel.ChannelHandlerContext; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.regex.Pattern; import lombok.Cleanup; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl.TopicsChangedListener; @@ -41,6 +41,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.topics.TopicsPatternFactory; import org.mockito.ArgumentCaptor; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -67,7 +68,7 @@ public class TopicListWatcherTest { Timer timer = new HashedWheelTimer(); when(client.timer()).thenReturn(timer); String topic = "persistent://tenant/ns/topic\\d+"; - when(client.getConnection(topic, 0)). + when(client.getConnection(anyString(), anyInt())). thenReturn(clientCnxFuture.thenApply(clientCnx -> Pair.of(clientCnx, false))); when(client.getConnection(any(), any(), anyInt())).thenReturn(clientCnxFuture); when(connectionPool.getConnection(any(), any(), anyInt())).thenReturn(clientCnxFuture); @@ -82,7 +83,7 @@ public class TopicListWatcherTest { watcherFuture = new CompletableFuture<>(); watcher = new TopicListWatcher(queue, client, - Pattern.compile(topic), 7, + TopicsPatternFactory.create(Pattern.compile(topic)), 7, NamespaceName.get("tenant/ns"), null, watcherFuture, () -> {}); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java index 26a295264fc..00390720dbb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.common.lookup; -import com.google.re2j.Pattern; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -30,6 +29,7 @@ import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.topics.TopicsPattern; /*** * A value object. @@ -123,7 +123,7 @@ public class GetTopicsResult { } } - public GetTopicsResult filterTopics(Pattern topicsPattern) { + public GetTopicsResult filterTopics(TopicsPattern topicsPattern) { List<String> topicsFiltered = TopicList.filterTopics(getTopics(), topicsPattern); // If nothing changed. if (topicsFiltered.equals(getTopics())) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/JDKTopicsPattern.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/JDKTopicsPattern.java new file mode 100644 index 00000000000..10313b59474 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/JDKTopicsPattern.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.topics; + +import java.util.regex.Pattern; + +class JDKTopicsPattern implements TopicsPattern { + private final String inputPattern; + private final Pattern pattern; + + public JDKTopicsPattern(String inputPattern, String regexWithoutTopicDomainScheme) { + this.inputPattern = inputPattern; + this.pattern = Pattern.compile(regexWithoutTopicDomainScheme); + } + + public JDKTopicsPattern(Pattern pattern) { + this.pattern = pattern; + this.inputPattern = pattern.pattern(); + } + + @Override + public boolean matches(String topicName) { + return pattern.matcher(topicName).matches(); + } + + @Override + public String inputPattern() { + return inputPattern; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/MatchAllTopicsPattern.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/MatchAllTopicsPattern.java new file mode 100644 index 00000000000..39de3eda925 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/MatchAllTopicsPattern.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.topics; + +import static org.apache.pulsar.common.topics.TopicList.ALL_TOPICS_PATTERN; + +class MatchAllTopicsPattern implements TopicsPattern { + public static final MatchAllTopicsPattern INSTANCE = new MatchAllTopicsPattern(); + + private MatchAllTopicsPattern() { + // Private constructor to prevent instantiation + } + + public boolean matches(String topicName) { + return true; // Matches all topic names + } + + @Override + public String inputPattern() { + return ALL_TOPICS_PATTERN; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/RE2JTopicsPattern.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/RE2JTopicsPattern.java new file mode 100644 index 00000000000..4e3384d58b8 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/RE2JTopicsPattern.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.topics; + +import com.google.re2j.Pattern; + +class RE2JTopicsPattern implements TopicsPattern { + private final String inputPattern; + private final Pattern pattern; + + public RE2JTopicsPattern(String inputPattern, String regexWithoutTopicDomainScheme) { + this.inputPattern = inputPattern; + this.pattern = Pattern.compile(regexWithoutTopicDomainScheme); + } + + @Override + public boolean matches(String topicName) { + return pattern.matcher(topicName).matches(); + } + + @Override + public String inputPattern() { + return inputPattern; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java index b3dcc7d09b7..629aedb3e72 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java @@ -35,31 +35,33 @@ import org.apache.pulsar.common.naming.TopicName; @UtilityClass public class TopicList { - public static final String ALL_TOPICS_PATTERN = ".*"; private static final String SCHEME_SEPARATOR = "://"; private static final Pattern SCHEME_SEPARATOR_PATTERN = Pattern.compile(Pattern.quote(SCHEME_SEPARATOR)); + public static List<String> filterTopics(List<String> original, java.util.regex.Pattern jdkPattern) { + return filterTopics(original, TopicsPatternFactory.create(jdkPattern)); + } + // get topics that match 'topicsPattern' from original topics list // return result should contain only topic names, without partition part - public static List<String> filterTopics(List<String> original, String regex) { - Pattern topicsPattern = Pattern.compile(regex); - return filterTopics(original, topicsPattern); + public static List<String> filterTopics(List<String> original, String regex, + TopicsPattern.RegexImplementation topicsPatternImplementation) { + return filterTopics(original, TopicsPatternFactory.create(regex, topicsPatternImplementation)); } - public static List<String> filterTopics(List<String> original, Pattern topicsPattern) { - - - final Pattern shortenedTopicsPattern = Pattern.compile(removeTopicDomainScheme(topicsPattern.toString())); - + /** + * Filter topics using a TopicListPattern instance. + */ + public static List<String> filterTopics(List<String> original, TopicsPattern topicsPattern) { return original.stream() .map(TopicName::get) .filter(topicName -> { String partitionedTopicName = topicName.getPartitionedTopicName(); String removedScheme = SCHEME_SEPARATOR_PATTERN.split(partitionedTopicName)[1]; - return shortenedTopicsPattern.matcher(removedScheme).matches(); + return topicsPattern.matches(removedScheme); }) .map(TopicName::toString) .collect(Collectors.toList()); @@ -108,6 +110,6 @@ public class TopicList { } else { throw new IllegalArgumentException("Does not support topic domain: " + prefix); } - return String.format("%s%s", prefix, removedTopicDomain); + return prefix + removedTopicDomain; } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPattern.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPattern.java new file mode 100644 index 00000000000..483443d6492 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPattern.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.topics; + +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; + +/** + * Interface for matching topic names against a pattern. + * Implementations can use different regex libraries such as RE2J or standard JDK. + * There's also an option to use RE2J with JDK fallback. + */ +public interface TopicsPattern { + /** + * The regex implementation type used by the TopicsPattern. + * RE2J is a fast regex engine that is suitable for high-performance applications. + * JDK uses the standard Java regex engine. + * RE2J_WITH_JDK_FALLBACK uses RE2J but falls back to JDK if RE2J fails to compile the pattern. + */ + enum RegexImplementation { + RE2J, + JDK, + RE2J_WITH_JDK_FALLBACK + } + + /** + * Evaluates the pattern for the given topic name which is expected to be + * passed without the domain scheme prefix in the format "tenant/namespace/topic". + * + * @param topicNameWithoutDomainSchemePrefix the topic name to match + * @return true if the topic matches the pattern, false otherwise + */ + boolean matches(String topicNameWithoutDomainSchemePrefix); + + /** + * Returns the original regex pattern used by this TopicsPattern passed as input. + * The internal implementation modifies the pattern to remove the possible topic domain scheme + * (e.g., "persistent://") since in matching the topic name, the domain scheme is ignored. + * + * @return the regex pattern as a string + */ + String inputPattern(); + + /** + * Returns the namespace associated with this TopicsPattern. + * This is typically used to determine the namespace context for the pattern. + * + * @return the NamespaceName associated with this TopicsPattern + */ + default NamespaceName namespace() { + return TopicName.get(inputPattern()).getNamespaceObject(); + } + + /** + * The Topic watcher instance will be created on the broker based on a lookup for the topic name + * returned by this method. The placement reuses topic lookup so that the Pulsar load balancer can + * place the topic watcher on different brokers based on load without having to implement another load balancing + * solution for topic watchers. + * + * @return the topic lookup name for topic watcher placement + */ + default String topicLookupNameForTopicListWatcherPlacement() { + // By default, return the pattern itself since it is sufficient for a topic lookup. + // Currently Pulsar doesn't limit the characters used in the topic name part of the pattern, but this would + // be a good place to apply any sanitization if needed in the future. + return inputPattern(); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java new file mode 100644 index 00000000000..50434543971 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.topics; + +import java.util.regex.Pattern; +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; + +/** + * Factory class for creating instances of TopicsPattern based on different regex implementations. + * It supports JDK regex, RE2J regex, and a fallback mechanism for RE2J with JDK. + */ +@UtilityClass +@Slf4j +public class TopicsPatternFactory { + /** + * Creates a TopicsPattern from a JDK Pattern. + * If the pattern contains the topic domain scheme, it will be removed and a RE2J_WITH_JDK_FALLBACK implementation + * will be used. If the pattern does not contain the topic domain scheme, the input JDK Pattern instance will be + * used to evaluate topic name matches. + * The Pulsar Java Client will use this method to create a TopicsPattern from a JDK Pattern. + * + * @param jdkPattern the JDK Pattern to create the TopicsPattern from + * @return a TopicsPattern instance + */ + public static TopicsPattern create(Pattern jdkPattern) { + String currentRegex = jdkPattern.pattern(); + String removedTopicDomainScheme = TopicList.removeTopicDomainScheme(currentRegex); + // If the regex pattern is already without the topic domain scheme, we can directly create a JDKTopicsPattern + if (currentRegex.equals(removedTopicDomainScheme)) { + return new JDKTopicsPattern(jdkPattern); + } else { + // If the regex pattern contains the topic domain scheme, we remove it and create a TopicsPattern + // using RE2J_WITH_JDK_FALLBACK + return internalCreate(currentRegex, removedTopicDomainScheme, + TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK); + } + } + + /** + * Creates a TopicsPattern from a regex string, using the specified regex implementation. + * The Pulsar Broker will use this method to create a TopicsPattern from a regex string. The implementation + * is configured in the broker configuration file with the `topicsPatternRegexImplementation` property. + * + * @param regex the regex string to create the TopicsPattern from + * @param implementation the regex implementation to use (RE2J, JDK, or RE2J_WITH_JDK_FALLBACK) + * @return a TopicsPattern instance + */ + public static TopicsPattern create(String regex, TopicsPattern.RegexImplementation implementation) { + return internalCreate(regex, TopicList.removeTopicDomainScheme(regex), implementation); + } + + /** + * Creates a TopicsPattern that matches all topics. + * + * @return a TopicsPattern instance that matches all topics + */ + public static TopicsPattern matchAll() { + return MatchAllTopicsPattern.INSTANCE; + } + + private static TopicsPattern internalCreate(String inputPattern, String regexWithoutTopicDomainScheme, + TopicsPattern.RegexImplementation implementation) { + if (TopicList.ALL_TOPICS_PATTERN.equals(regexWithoutTopicDomainScheme)) { + return matchAll(); + } + switch (implementation) { + case RE2J: + return new RE2JTopicsPattern(inputPattern, regexWithoutTopicDomainScheme); + case JDK: + return new JDKTopicsPattern(inputPattern, regexWithoutTopicDomainScheme); + case RE2J_WITH_JDK_FALLBACK: + try { + return new RE2JTopicsPattern(inputPattern, regexWithoutTopicDomainScheme); + } catch (com.google.re2j.PatternSyntaxException e) { + if (log.isDebugEnabled()) { + log.debug("Failed to compile regex pattern '{}' with RE2J, fallback to JDK", + regexWithoutTopicDomainScheme, e); + } + // Fallback to JDK implementation if RE2J fails + return new JDKTopicsPattern(inputPattern, regexWithoutTopicDomainScheme); + } + default: + throw new IllegalArgumentException("Unknown RegexImplementation: " + implementation); + } + } +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java index a62aa012c7f..ee14f345029 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java @@ -18,17 +18,17 @@ */ package org.apache.pulsar.common.topics; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; -import com.google.re2j.Pattern; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.stream.Stream; +import java.util.regex.Pattern; import org.testng.annotations.Test; public class TopicListTest { @@ -44,12 +44,11 @@ public class TopicListTest { Pattern pattern1 = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*"); List<String> result1 = TopicList.filterTopics(topicsNames, pattern1); - assertTrue(result1.size() == 2 && result1.contains(topicName1) && result1.contains(topicName2)); + assertThat(result1).containsExactly(topicName1, topicName2); Pattern pattern2 = Pattern.compile("persistent://my-property/my-ns/.*"); List<String> result2 = TopicList.filterTopics(topicsNames, pattern2); - assertTrue(result2.size() == 4 - && Stream.of(topicName1, topicName2, topicName3, topicName4).allMatch(result2::contains)); + assertThat(result2).containsExactly(topicName1, topicName2, topicName3, topicName4); } @Test diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicsPatternFactoryTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicsPatternFactoryTest.java new file mode 100644 index 00000000000..c244c0bc1b3 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicsPatternFactoryTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.topics; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import com.google.re2j.PatternSyntaxException; +import java.util.regex.Pattern; +import org.testng.annotations.Test; + +public class TopicsPatternFactoryTest { + public static final String DEFAULT_TOPICS_PATTERN = "tenant/namespace/test-topic-.*"; + public static final String DEFAULT_TOPICS_PATTERN_WITH_DOMAIN = "persistent://" + DEFAULT_TOPICS_PATTERN; + + @Test + public void testCreateWithJdkPattern() { + Pattern jdkPattern = Pattern.compile(DEFAULT_TOPICS_PATTERN); + TopicsPattern topicsPattern = TopicsPatternFactory.create(jdkPattern); + assertTopicsPattern(topicsPattern, false); + } + + private static void assertTopicsPattern(TopicsPattern topicsPattern, boolean inputIncludesDomainScheme) { + assertNotNull(topicsPattern); + assertEquals(topicsPattern.inputPattern(), + inputIncludesDomainScheme ? DEFAULT_TOPICS_PATTERN_WITH_DOMAIN : DEFAULT_TOPICS_PATTERN); + assertTopicsPatternMatches(topicsPattern); + } + + private static void assertTopicsPatternMatches(TopicsPattern topicsPattern) { + assertTrue(topicsPattern.matches("tenant/namespace/test-topic-1")); + assertTrue(topicsPattern.matches("tenant/namespace/test-topic-abc")); + assertFalse(topicsPattern.matches("tenant/namespace/other-topic")); + } + + @Test + public void testCreateWithJdkPatternContainingTopicDomainScheme() { + Pattern jdkPattern = Pattern.compile(DEFAULT_TOPICS_PATTERN_WITH_DOMAIN); + TopicsPattern topicsPattern = TopicsPatternFactory.create(jdkPattern); + assertTopicsPattern(topicsPattern, true); + } + + @Test + public void testCreateWithStringAndRE2JImplementation() { + TopicsPattern topicsPattern = TopicsPatternFactory.create(DEFAULT_TOPICS_PATTERN, + TopicsPattern.RegexImplementation.RE2J); + assertTopicsPattern(topicsPattern, false); + } + + @Test + public void testCreateWithStringAndJDKImplementation() { + TopicsPattern topicsPattern = TopicsPatternFactory.create(DEFAULT_TOPICS_PATTERN, + TopicsPattern.RegexImplementation.JDK); + assertTopicsPattern(topicsPattern, false); + } + + @Test + public void testCreateWithStringAndRE2JWithJDKFallbackImplementation() { + TopicsPattern topicsPattern = TopicsPatternFactory.create(DEFAULT_TOPICS_PATTERN, + TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK); + assertTopicsPattern(topicsPattern, false); + } + + @Test + public void testCreateWithStringContainingTopicDomainScheme() { + TopicsPattern topicsPattern = TopicsPatternFactory.create(DEFAULT_TOPICS_PATTERN_WITH_DOMAIN, + TopicsPattern.RegexImplementation.JDK); + assertTopicsPattern(topicsPattern, true); + } + + @Test + public void testCreateWithAllTopicsPattern() { + TopicsPattern topicsPattern = TopicsPatternFactory.create(".*", + TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK); + assertNotNull(topicsPattern); + assertTrue(topicsPattern.matches("tenant/namespace/any-topic")); + assertTrue(topicsPattern.matches("tenant/namespace/test-topic-1")); + assertTrue(topicsPattern.matches("")); + } + + @Test + public void testMatchAll() { + TopicsPattern topicsPattern = TopicsPatternFactory.matchAll(); + assertNotNull(topicsPattern); + assertTrue(topicsPattern.matches("tenant/namespace/any-topic")); + assertTrue(topicsPattern.matches("tenant/namespace/test-topic-1")); + assertTrue(topicsPattern.matches("")); + assertTrue(topicsPattern.matches("tenant/namespace/very-long-topic-name")); + } + + @Test(expectedExceptions = NullPointerException.class) + public void testCreateWithInvalidImplementation() { + TopicsPatternFactory.create(null, TopicsPattern.RegexImplementation.JDK); + } + + @Test + public void testRE2JFallbackToJDK() { + // example of excluding topics that start with "test2-" or "test3-" or "other-", but require that the topic + // name ends with "-topic" or "-topic-" and any suffix + // This regex cannot be parsed by RE2J, so it should fall back to JDK regex + String complexRegex = "tenant/namespace/(?!(test2|test3|other)-).+-topic(-.*)?"; + TopicsPattern topicsPattern = TopicsPatternFactory.create(complexRegex, + TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK); + assertNotNull(topicsPattern); + assertTopicsPatternMatches(topicsPattern); + assertTrue(topicsPattern.matches("tenant/namespace/any-topic")); + assertFalse(topicsPattern.matches("tenant/namespace/test2-topic")); + assertFalse(topicsPattern.matches("tenant/namespace/test3-topic")); + assertTrue(topicsPattern.matches("tenant/namespace/test4-topic")); + assertTrue(topicsPattern.matches("tenant/namespace/test20-topic")); + } + + @Test(expectedExceptions = PatternSyntaxException.class) + public void testRE2JParsingFailure() { + String complexRegex = "tenant/namespace/(?!(test2|test3|other)-).+-topic(-.*)?"; + TopicsPattern topicsPattern = TopicsPatternFactory.create(complexRegex, + TopicsPattern.RegexImplementation.RE2J); + } +} \ No newline at end of file diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicsPatternTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicsPatternTest.java new file mode 100644 index 00000000000..68e54cc2a29 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicsPatternTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.topics; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.util.regex.Pattern; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.annotations.Test; + +public class TopicsPatternTest { + + @Test + public void testNamespace() { + TopicsPattern pattern = TopicsPatternFactory.create(Pattern.compile("tenant/namespace/topic-.*")); + NamespaceName namespace = pattern.namespace(); + assertEquals(namespace.getTenant(), "tenant"); + assertEquals(namespace.getLocalName(), "namespace"); + assertEquals(namespace.toString(), "tenant/namespace"); + + // Test with a standard topic pattern + TopicsPattern domainPrefixPattern = + TopicsPatternFactory.create(Pattern.compile("persistent://tenant/namespace/topic-.*")); + NamespaceName namespace2 = pattern.namespace(); + assertEquals(namespace.getTenant(), "tenant"); + assertEquals(namespace.getLocalName(), "namespace"); + assertEquals(namespace.toString(), "tenant/namespace"); + + // Test with a more complex topic pattern + TopicsPattern complexPattern = + TopicsPatternFactory.create(Pattern.compile("persistent://my-tenant/my-namespace/prefix-.*-suffix")); + NamespaceName complexNamespace = complexPattern.namespace(); + assertEquals(complexNamespace.getTenant(), "my-tenant"); + assertEquals(complexNamespace.getLocalName(), "my-namespace"); + assertEquals(complexNamespace.toString(), "my-tenant/my-namespace"); + + // Test with non-persistent topic pattern + TopicsPattern nonPersistentPattern = + TopicsPatternFactory.create(Pattern.compile("non-persistent://test-tenant/test-namespace/.*")); + NamespaceName nonPersistentNamespace = nonPersistentPattern.namespace(); + assertEquals(nonPersistentNamespace.getTenant(), "test-tenant"); + assertEquals(nonPersistentNamespace.getLocalName(), "test-namespace"); + assertEquals(nonPersistentNamespace.toString(), "test-tenant/test-namespace"); + } + + @Test + public void testTopicLookupNameForTopicListWatcherPlacement() { + TopicsPattern pattern = + TopicsPatternFactory.create(Pattern.compile("persistent://tenant/namespace/topic-.*(\\d+)?")); + String lookupName = pattern.topicLookupNameForTopicListWatcherPlacement(); + assertNotNull(lookupName); + TopicName lookupTopicName = TopicName.get(lookupName); + assertEquals(lookupTopicName.getTenant(), "tenant"); + assertEquals(lookupTopicName.getNamespacePortion(), "namespace"); + assertEquals(lookupTopicName.getLocalName(), "topic-.*(\\d+)?"); + } +} \ No newline at end of file
