This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 941cda992037f74a5c2e26b2463840ac101126c8
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Jul 22 19:55:59 2025 +0300

    [fix][misc] Fix topics pattern consumer backwards compatibility (#24537)
    
    (cherry picked from commit c358e7135e0b115dfb7aee5617ed1c90b6996e78)
---
 conf/broker.conf                                   |  10 ++
 conf/standalone.conf                               |  10 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  15 +++
 .../apache/pulsar/broker/service/ServerCnx.java    |  21 ++--
 .../pulsar/broker/service/TopicListService.java    |  39 ++++--
 .../broker/service/TopicListServiceTest.java       |  16 +--
 .../broker/service/TopicListWatcherTest.java       |   7 +-
 .../impl/PatternTopicsConsumerImplAuthTest.java    |   2 +-
 .../client/impl/PatternTopicsConsumerImplTest.java |  28 ++---
 .../common/naming/ServiceConfigurationTest.java    |  19 +++
 .../impl/PatternMultiTopicsConsumerImpl.java       |  23 ++--
 .../pulsar/client/impl/PulsarClientImpl.java       |   5 +-
 .../pulsar/client/impl/TopicListWatcher.java       |  10 +-
 .../impl/PatternMultiTopicsConsumerImplTest.java   |   9 +-
 .../pulsar/client/impl/TopicListWatcherTest.java   |   7 +-
 .../pulsar/common/lookup/GetTopicsResult.java      |   4 +-
 .../pulsar/common/topics/JDKTopicsPattern.java     |  46 +++++++
 .../common/topics/MatchAllTopicsPattern.java       |  38 ++++++
 .../pulsar/common/topics/RE2JTopicsPattern.java    |  41 +++++++
 .../org/apache/pulsar/common/topics/TopicList.java |  24 ++--
 .../apache/pulsar/common/topics/TopicsPattern.java |  84 +++++++++++++
 .../pulsar/common/topics/TopicsPatternFactory.java | 103 ++++++++++++++++
 .../apache/pulsar/common/topics/TopicListTest.java |   9 +-
 .../common/topics/TopicsPatternFactoryTest.java    | 136 +++++++++++++++++++++
 .../pulsar/common/topics/TopicsPatternTest.java    |  74 +++++++++++
 25 files changed, 688 insertions(+), 92 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 5d4aeab64ad..2a7befafed3 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -730,6 +730,16 @@ enableBrokerSideSubscriptionPatternEvaluation=true
 # Longer patterns are rejected to avoid patterns that are crafted to overload 
the broker.
 subscriptionPatternMaxLength=50
 
+# The regular expression implementation to use for topic pattern matching.
+# RE2J_WITH_JDK_FALLBACK is the default. It uses the RE2J implementation and 
falls back to
+# the JDK implementation for backwards compatibility reasons when the pattern 
compilation fails
+# with the RE2/j library.
+# RE2J is more performant but does not support all regex features (e.g. 
negative lookaheads).
+# JDK uses the standard Java regex implementation which supports all features 
but can be slower.
+# Bad or malicious regex patterns requiring extensive backtracing could cause 
high resource usage
+# with RE2J_WITH_JDK_FALLBACK or JDK implementations.
+topicsPatternRegexImplementation=RE2J_WITH_JDK_FALLBACK
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request 
with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 1aaef7ed36e..22d769c7db0 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -386,6 +386,16 @@ enableBrokerSideSubscriptionPatternEvaluation=true
 # Longer patterns are rejected to avoid patterns that are crafted to overload 
the broker.
 subscriptionPatternMaxLength=50
 
+# The regular expression implementation to use for topic pattern matching.
+# RE2J_WITH_JDK_FALLBACK is the default. It uses the RE2J implementation and 
falls back to
+# the JDK implementation for backwards compatibility reasons when the pattern 
compilation fails
+# with the RE2/j library.
+# RE2J is more performant but does not support all regex features (e.g. 
negative lookaheads).
+# JDK uses the standard Java regex implementation which supports all features 
but can be slower.
+# Bad or malicious regex patterns requiring extensive backtracing could cause 
high resource usage
+# with RE2J_WITH_JDK_FALLBACK or JDK implementations.
+topicsPatternRegexImplementation=RE2J_WITH_JDK_FALLBACK
+
 ### --- Metadata Store --- ###
 
 # Whether we should enable metadata operations batching
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index df102a2f2aa..c8d65cb46af 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -51,6 +51,7 @@ import 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.sasl.SaslConstants;
+import org.apache.pulsar.common.topics.TopicsPattern;
 import org.apache.pulsar.common.util.DefaultPulsarSslFactory;
 import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
@@ -1569,6 +1570,20 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
                     + "# NOTE: This flag will be removed in some major 
releases in the future.\n")
     private boolean strictTopicNameEnabled = false;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "The regular expression implementation to use for topic 
pattern matching. \n"
+                    + "RE2J_WITH_JDK_FALLBACK is the default. It uses the RE2J 
implementation and falls back to "
+                    + "the JDK implementation for backwards compatibility 
reasons when the pattern compilation fails "
+                    + "with the RE2/j library.\n"
+                    + "RE2J is more performant but does not support all regex 
features (e.g. negative lookaheads). \n"
+                    + "JDK uses the standard Java regex implementation which 
supports all features but can be slower.\n"
+                    + "Bad or malicious regex patterns requiring extensive 
backtracing could cause high resource usage "
+                    + "with RE2J_WITH_JDK_FALLBACK or JDK implementations."
+    )
+    private TopicsPattern.RegexImplementation topicsPatternRegexImplementation 
=
+            TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK;
+
     @FieldContext(
             category = CATEGORY_SCHEMA,
             doc = "The schema compatibility strategy to use for system topics"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b0774513057..f68c851af3c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -32,7 +32,6 @@ import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static 
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-import com.google.re2j.Pattern;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
@@ -175,6 +174,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.topics.TopicsPattern;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.StringInterner;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -252,6 +252,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     private final long resumeThresholdPendingBytesPerThread;
 
     private final long connectionLivenessCheckTimeoutMillis;
+    private final TopicsPattern.RegexImplementation 
topicsPatternImplementation;
 
     // Tracks and limits number of bytes pending to be published from a single 
specific IO thread.
     static final class PendingBytesPerThreadTracker {
@@ -354,6 +355,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 enableSubscriptionPatternEvaluation, 
maxSubscriptionPatternLength);
         this.brokerInterceptor = this.service != null ? 
this.service.getInterceptor() : null;
         this.throttleTracker = new ServerCnxThrottleTracker(this);
+        topicsPatternImplementation = 
conf.getTopicsPatternRegexImplementation();
     }
 
     @Override
@@ -2521,7 +2523,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             if (enableSubscriptionPatternEvaluation && 
topicsPattern.isPresent()) {
                                 if (topicsPattern.get().length() <= 
maxSubscriptionPatternLength) {
                                     filterTopics = true;
-                                    filteredTopics = 
TopicList.filterTopics(filteredTopics, topicsPattern.get());
+                                    filteredTopics = 
TopicList.filterTopics(filteredTopics, topicsPattern.get(),
+                                            topicsPatternImplementation);
                                 } else {
                                     log.info("[{}] Subscription pattern 
provided [{}] was longer than maximum {}.",
                                             remoteAddress, 
topicsPattern.get(), maxSubscriptionPatternLength);
@@ -3165,18 +3168,16 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         final long watcherId = commandWatchTopicList.getWatcherId();
         final NamespaceName namespaceName = 
NamespaceName.get(commandWatchTopicList.getNamespace());
 
-        Pattern topicsPattern = 
Pattern.compile(commandWatchTopicList.hasTopicsPattern()
-                ? 
TopicList.removeTopicDomainScheme(commandWatchTopicList.getTopicsPattern())
-                : TopicList.ALL_TOPICS_PATTERN);
-        String topicsHash = commandWatchTopicList.hasTopicsHash()
-                ? commandWatchTopicList.getTopicsHash() : null;
-
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
             isNamespaceOperationAllowed(namespaceName, 
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
                 if (isAuthorized) {
-                    topicListService.handleWatchTopicList(namespaceName, 
watcherId, requestId, topicsPattern,
-                            topicsHash, lookupSemaphore);
+                    String topicsPatternString = 
commandWatchTopicList.hasTopicsPattern()
+                            ? commandWatchTopicList.getTopicsPattern() : 
TopicList.ALL_TOPICS_PATTERN;
+                    String topicsHash = commandWatchTopicList.hasTopicsHash()
+                            ? commandWatchTopicList.getTopicsHash() : null;
+                    topicListService.handleWatchTopicList(namespaceName, 
watcherId, requestId, topicsPatternString,
+                            topicsPatternImplementation, topicsHash, 
lookupSemaphore);
                 } else {
                     final String msg = "Proxy Client is not authorized to 
watchTopicList";
                     log.warn("[{}] {} with role {} on namespace {}", 
remoteAddress, msg, getPrincipal(), namespaceName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
index 818188fd182..ef2ea284cf7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service;
 
-import com.google.re2j.Pattern;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -33,6 +32,8 @@ import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.topics.TopicsPattern;
+import org.apache.pulsar.common.topics.TopicsPatternFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.slf4j.Logger;
@@ -48,13 +49,13 @@ public class TopicListService {
         private final TopicListService topicListService;
         private final long id;
         /** The regexp for the topic name(not contains partition suffix). **/
-        private final Pattern topicsPattern;
+        private final TopicsPattern topicsPattern;
 
         /***
          * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
          */
         public TopicListWatcher(TopicListService topicListService, long id,
-                                Pattern topicsPattern, List<String> topics) {
+                                TopicsPattern topicsPattern, List<String> 
topics) {
             this.topicListService = topicListService;
             this.id = id;
             this.topicsPattern = topicsPattern;
@@ -71,7 +72,9 @@ public class TopicListService {
         @Override
         public void accept(String topicName, NotificationType 
notificationType) {
             String partitionedTopicName = 
TopicName.get(topicName).getPartitionedTopicName();
-            if 
(topicsPattern.matcher(TopicList.removeTopicDomainScheme(partitionedTopicName)).matches())
 {
+            String domainLessTopicName = 
TopicList.removeTopicDomainScheme(partitionedTopicName);
+
+            if (topicsPattern.matches(domainLessTopicName)) {
                 List<String> newTopics;
                 List<String> deletedTopics;
                 if (notificationType == NotificationType.Deleted) {
@@ -120,12 +123,17 @@ public class TopicListService {
     }
 
     /***
-     * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
+     * @param topicsPatternString The regexp for the topic name
      */
-    public void handleWatchTopicList(NamespaceName namespaceName, long 
watcherId, long requestId, Pattern topicsPattern,
-                                     String topicsHash, Semaphore 
lookupSemaphore) {
+    public void handleWatchTopicList(NamespaceName namespaceName, long 
watcherId, long requestId,
+                                     String topicsPatternString,
+                                     TopicsPattern.RegexImplementation 
topicsPatternRegexImplementation,
+                                     String topicsHash,
+                                     Semaphore lookupSemaphore) {
+        // remove the domain scheme from the topic pattern
+        topicsPatternString = 
TopicList.removeTopicDomainScheme(topicsPatternString);
 
-        if (!enableSubscriptionPatternEvaluation || 
topicsPattern.pattern().length() > maxSubscriptionPatternLength) {
+        if (!enableSubscriptionPatternEvaluation || 
topicsPatternString.length() > maxSubscriptionPatternLength) {
             String msg = "Unable to create topic list watcher: ";
             if (!enableSubscriptionPatternEvaluation) {
                 msg += "Evaluating subscription patterns is disabled.";
@@ -137,6 +145,19 @@ public class TopicListService {
             lookupSemaphore.release();
             return;
         }
+
+        TopicsPattern topicsPattern;
+        try {
+            topicsPattern = TopicsPatternFactory.create(topicsPatternString, 
topicsPatternRegexImplementation);
+        } catch (Exception e) {
+            log.warn("[{}] Unable to create topic list watcher: Invalid 
pattern: {} on namespace {}",
+                    connection.toString(), topicsPatternString, namespaceName);
+            connection.getCommandSender().sendErrorResponse(requestId, 
ServerError.InvalidTopicName,
+                    "Invalid topics pattern: " + e.getMessage());
+            lookupSemaphore.release();
+            return;
+        }
+
         CompletableFuture<TopicListWatcher> watcherFuture = new 
CompletableFuture<>();
         CompletableFuture<TopicListWatcher> existingWatcherFuture = 
watchers.putIfAbsent(watcherId, watcherFuture);
 
@@ -201,7 +222,7 @@ public class TopicListService {
      * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
      */
     public void 
initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
-            NamespaceName namespace, long watcherId, Pattern topicsPattern) {
+            NamespaceName namespace, long watcherId, TopicsPattern 
topicsPattern) {
         namespaceService.getListOfPersistentTopics(namespace).
                 thenApply(topics -> {
                     TopicListWatcher watcher = new TopicListWatcher(this, 
watcherId, topicsPattern, topics);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
index 95643c5ae45..9109828c025 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
-import com.google.re2j.Pattern;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.List;
@@ -39,6 +38,7 @@ import 
org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.topics.TopicsPattern;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -50,6 +50,8 @@ public class TopicListServiceTest {
     private CompletableFuture<List<String>> topicListFuture;
     private Semaphore lookupSemaphore;
     private TopicResources topicResources;
+    private final TopicsPattern.RegexImplementation 
topicsPatternImplementation =
+            TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK;
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
@@ -80,8 +82,8 @@ public class TopicListServiceTest {
                 NamespaceName.get("tenant/ns"),
                 13,
                 7,
-                Pattern.compile("persistent://tenant/ns/topic\\d"),
-                null,
+                "persistent://tenant/ns/topic\\d",
+                topicsPatternImplementation, null,
                 lookupSemaphore);
         List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
         String hash = TopicList.calculateHash(topics);
@@ -98,8 +100,8 @@ public class TopicListServiceTest {
                 NamespaceName.get("tenant/ns"),
                 13,
                 7,
-                Pattern.compile("persistent://tenant/ns/topic\\d"),
-                null,
+                "persistent://tenant/ns/topic\\d",
+                topicsPatternImplementation, null,
                 lookupSemaphore);
         topicListFuture.completeExceptionally(new 
PulsarServerException("Error"));
         Assert.assertEquals(1, lookupSemaphore.availablePermits());
@@ -114,8 +116,8 @@ public class TopicListServiceTest {
                 NamespaceName.get("tenant/ns"),
                 13,
                 7,
-                Pattern.compile("persistent://tenant/ns/topic\\d"),
-                null,
+                "persistent://tenant/ns/topic\\d",
+                topicsPatternImplementation, null,
                 lookupSemaphore);
         List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
         topicListFuture.complete(topics);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
index 086051d3043..884cdc0ef92 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
@@ -21,11 +21,12 @@ package org.apache.pulsar.broker.service;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
-import com.google.re2j.Pattern;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.topics.TopicsPattern;
+import org.apache.pulsar.common.topics.TopicsPatternFactory;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -40,7 +41,8 @@ public class TopicListWatcherTest {
     );
 
     private static final long ID = 7;
-    private static final Pattern PATTERN = 
Pattern.compile("tenant/ns/topic\\d+");
+    private static final TopicsPattern PATTERN =
+            TopicsPatternFactory.create("tenant/ns/topic\\d+", 
TopicsPattern.RegexImplementation.RE2J);
 
 
     private TopicListService topicListService;
@@ -98,5 +100,4 @@ public class TopicListWatcherTest {
                 Arrays.asList("persistent://tenant/ns/topic1", 
"persistent://tenant/ns/topic2"),
                 watcher.getMatchingTopics());
     }
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
index 061a6baa974..e441de3ea1a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -204,7 +204,7 @@ public class PatternTopicsConsumerImplAuthTest extends 
ProducerConsumerBase {
         
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
 
         // 4. verify consumer
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions();
         List<ConsumerImpl<byte[]>> consumers = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 90031ff92ea..c1b96a058f4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -227,7 +227,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         });
 
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions();
         List<ConsumerImpl<byte[]>> consumers = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
 
@@ -311,7 +311,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         });
 
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions();
         List<ConsumerImpl<byte[]>> consumers = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
 
@@ -394,7 +394,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         });
 
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions();
         List<ConsumerImpl<byte[]>> consumers = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
 
@@ -492,7 +492,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         });
 
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions();
         List<ConsumerImpl<byte[]>> consumers = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
 
@@ -569,7 +569,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         });
 
         // 3. verify consumer get methods, to get 5 number of partitions and 
topics.
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 5);
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 5);
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 2);
@@ -593,7 +593,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 5. verify consumer get methods, to get number of partitions and 
topics, value 6=1+2+3.
         Awaitility.await().untilAsserted(() -> {
-            assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+            assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 6);
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 2);
@@ -665,7 +665,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 2. verify consumer get methods. There is no need to trigger 
discovery, because the broker will push the
         // changes to update(CommandWatchTopicUpdate).
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         Awaitility.await().untilAsserted(() -> {
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 4);
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 4);
@@ -704,7 +704,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         admin.topics().createPartitionedTopic(topicName, 4);
 
         // 2. verify broker will push the changes to 
update(CommandWatchTopicUpdate).
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 4);
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 4);
@@ -761,7 +761,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2).create();
 
         // 3. verify will update the partitions and consumers
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 8);
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 8);
@@ -825,7 +825,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         }
 
         // 2. verify consumer can subscribe the topic.
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         Awaitility.await().untilAsserted(() -> {
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 1);
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 1);
@@ -883,7 +883,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 2. verify consumer can subscribe the topic.
         // Since the minimum value of `patternAutoDiscoveryPeriod` is 60s, we 
set the test timeout to a triple value.
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         Awaitility.await().atMost(Duration.ofMinutes(3)).untilAsserted(() -> {
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 1);
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 1);
@@ -980,7 +980,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
 
         // 4. verify consumer get methods, to get 6 number of partitions and 
topics: 6=1+2+3
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 6);
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 2);
@@ -1095,7 +1095,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
 
         // 4. verify consumer get methods, to get 0 number of partitions and 
topics: 6=1+2+3
-        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().inputPattern());
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 6);
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 2);
@@ -1188,7 +1188,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         PatternMultiTopicsConsumerImpl<String> consumerImpl = 
(PatternMultiTopicsConsumerImpl<String>) consumer;
 
         // 4. verify consumer get methods
-        assertSame(consumerImpl.getPattern().pattern(), pattern.pattern());
+        assertSame(consumerImpl.getPattern().inputPattern(), 
pattern.pattern());
         assertEquals(consumerImpl.getPartitionedTopics().size(), 0);
 
         producer1.send("msg-1");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 9bde2aecd0f..1802bd6f59c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -44,6 +44,7 @@ import 
org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.topics.TopicsPattern;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-naming")
@@ -413,4 +414,22 @@ public class ServiceConfigurationTest {
                 ServiceConfiguration.class);
         assertEquals(conf.lookupProperties(), Map.of("lookup.key2", "value2"));
     }
+
+    @Test
+    public void testTopicsPatternRegexImplementationOptions() throws 
IOException {
+        ServiceConfiguration conf = 
loadConfString("topicsPatternRegexImplementation=RE2J_WITH_JDK_FALLBACK");
+        assertEquals(conf.getTopicsPatternRegexImplementation(),
+                TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK);
+        conf = loadConfString("topicsPatternRegexImplementation=JDK");
+        assertEquals(conf.getTopicsPatternRegexImplementation(),
+                TopicsPattern.RegexImplementation.JDK);
+        conf = loadConfString("topicsPatternRegexImplementation=RE2J");
+        assertEquals(conf.getTopicsPatternRegexImplementation(),
+                TopicsPattern.RegexImplementation.RE2J);
+    }
+
+    private static ServiceConfiguration loadConfString(String confString) 
throws IOException {
+        return PulsarConfigurationLoader.create(new 
ByteArrayInputStream(confString.getBytes(StandardCharsets.UTF_8)),
+                ServiceConfiguration.class);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 5003ed9673a..a31b857d251 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -18,10 +18,8 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-import com.google.re2j.Pattern;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
@@ -44,12 +42,13 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.topics.TopicsPattern;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T> implements TimerTask {
-    private final Pattern topicsPattern;
+    private final TopicsPattern topicsPattern;
     final TopicsChangedListener topicsChangeListener;
     private final Mode subscriptionMode;
     private final CompletableFuture<TopicListWatcher> watcherFuture = new 
CompletableFuture<>();
@@ -66,7 +65,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     /***
      * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
      */
-    public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
+    public PatternMultiTopicsConsumerImpl(TopicsPattern topicsPattern,
                                           String topicsHash,
                                           PulsarClientImpl client,
                                           ConsumerConfigurationData<T> conf,
@@ -80,11 +79,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
         this.topicsPattern = topicsPattern;
         this.topicsHash = topicsHash;
         this.subscriptionMode = subscriptionMode;
-
-        if (this.namespaceName == null) {
-            this.namespaceName = getNameSpaceFromPattern(topicsPattern);
-        }
-        
checkArgument(getNameSpaceFromPattern(topicsPattern).toString().equals(this.namespaceName.toString()));
+        this.namespaceName = topicsPattern.namespace();
 
         this.topicsChangeListener = new PatternTopicsChangedListener();
         this.updateTaskQueue = new PatternConsumerUpdateQueue(this);
@@ -109,10 +104,6 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
         }
     }
 
-    public static NamespaceName getNameSpaceFromPattern(Pattern pattern) {
-        return TopicName.get(pattern.pattern()).getNamespaceObject();
-    }
-
     /**
      * This method will be called after the {@link TopicListWatcher} 
reconnected after enabled {@link TopicListWatcher}.
      */
@@ -135,7 +126,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     }
 
     CompletableFuture<Void> recheckTopicsChange() {
-        String pattern = topicsPattern.pattern();
+        String pattern = topicsPattern.inputPattern();
         final int epoch = recheckPatternEpoch.incrementAndGet();
         return client.getLookup().getTopicsUnderNamespace(namespaceName, 
subscriptionMode, pattern, topicsHash)
             .thenCompose(getTopicsResult -> {
@@ -168,7 +159,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
             });
     }
 
-    static CompletableFuture<Void> updateSubscriptions(Pattern topicsPattern,
+    static CompletableFuture<Void> updateSubscriptions(TopicsPattern 
topicsPattern,
                                                        
java.util.function.Consumer<String> topicsHashSetter,
                                                        GetTopicsResult 
getTopicsResult,
                                                        TopicsChangedListener 
topicsChangedListener,
@@ -198,7 +189,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
         return 
FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback));
     }
 
-    public Pattern getPattern() {
+    public TopicsPattern getPattern() {
         return this.topicsPattern;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index f5722838d52..950a01afb46 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.re2j.Pattern;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
@@ -91,6 +90,8 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.topics.TopicsPattern;
+import org.apache.pulsar.common.topics.TopicsPatternFactory;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -638,7 +639,7 @@ public class PulsarClientImpl implements PulsarClient {
         Mode subscriptionMode = 
convertRegexSubscriptionMode(conf.getRegexSubscriptionMode());
         TopicName destination = TopicName.get(regex);
         NamespaceName namespaceName = destination.getNamespaceObject();
-        Pattern pattern = Pattern.compile(conf.getTopicsPattern().pattern());
+        TopicsPattern pattern = 
TopicsPatternFactory.create(conf.getTopicsPattern());
 
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
         lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode, regex, 
null)
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 5357b154622..5ea1e22cc30 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.re2j.Pattern;
 import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -30,6 +29,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.topics.TopicsPattern;
 import org.apache.pulsar.common.util.BackoffBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@ public class TopicListWatcher extends HandlerState implements 
ConnectionHandler.
     private final PatternConsumerUpdateQueue patternConsumerUpdateQueue;
     private final String name;
     private final ConnectionHandler connectionHandler;
-    private final Pattern topicsPattern;
+    private final TopicsPattern topicsPattern;
     private final long watcherId;
     private volatile long createWatcherDeadline = 0;
     private final NamespaceName namespace;
@@ -63,11 +63,11 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
      * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
      */
     public TopicListWatcher(PatternConsumerUpdateQueue 
patternConsumerUpdateQueue,
-                            PulsarClientImpl client, Pattern topicsPattern, 
long watcherId,
+                            PulsarClientImpl client, TopicsPattern 
topicsPattern, long watcherId,
                             NamespaceName namespace, String topicsHash,
                             CompletableFuture<TopicListWatcher> watcherFuture,
                             Runnable recheckTopicsChangeAfterReconnect) {
-        super(client, topicsPattern.pattern());
+        super(client, 
topicsPattern.topicLookupNameForTopicListWatcherPlacement());
         this.patternConsumerUpdateQueue = patternConsumerUpdateQueue;
         this.name = "Watcher(" + topicsPattern + ")";
         this.connectionHandler = new ConnectionHandler(this,
@@ -131,7 +131,7 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
         synchronized (this) {
             setClientCnx(cnx);
             BaseCommand watchRequest = Commands.newWatchTopicList(requestId, 
watcherId, namespace.toString(),
-                            topicsPattern.pattern(), topicsHash);
+                            topicsPattern.inputPattern(), topicsHash);
 
             cnx.newWatchTopicList(watchRequest, requestId)
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
index 0407a49da52..4bb541fbd5a 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
@@ -24,12 +24,13 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import com.google.common.collect.Sets;
-import com.google.re2j.Pattern;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import java.util.regex.Pattern;
 import org.apache.pulsar.common.lookup.GetTopicsResult;
+import org.apache.pulsar.common.topics.TopicsPatternFactory;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -52,7 +53,7 @@ public class PatternMultiTopicsConsumerImplTest {
     @Test
     public void testChangedUnfilteredResponse() {
         PatternMultiTopicsConsumerImpl.updateSubscriptions(
-                Pattern.compile("tenant/my-ns/name-.*"),
+                
TopicsPatternFactory.create(Pattern.compile("tenant/my-ns/name-.*")),
                 mockTopicsHashSetter,
                 new GetTopicsResult(Arrays.asList(
                         "persistent://tenant/my-ns/name-1",
@@ -71,7 +72,7 @@ public class PatternMultiTopicsConsumerImplTest {
     @Test
     public void testChangedFilteredResponse() {
         PatternMultiTopicsConsumerImpl.updateSubscriptions(
-                Pattern.compile("tenant/my-ns/name-.*"),
+                
TopicsPatternFactory.create(Pattern.compile("tenant/my-ns/name-.*")),
                 mockTopicsHashSetter,
                 new GetTopicsResult(Arrays.asList(
                         "persistent://tenant/my-ns/name-0",
@@ -90,7 +91,7 @@ public class PatternMultiTopicsConsumerImplTest {
     @Test
     public void testUnchangedResponse() {
         PatternMultiTopicsConsumerImpl.updateSubscriptions(
-                Pattern.compile("tenant/my-ns/name-.*"),
+                
TopicsPatternFactory.create(Pattern.compile("tenant/my-ns/name-.*")),
                 mockTopicsHashSetter,
                 new GetTopicsResult(Arrays.asList(
                         "persistent://tenant/my-ns/name-0",
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
index 803305911ce..601045d17c1 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
@@ -27,12 +27,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertTrue;
-import com.google.re2j.Pattern;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
 import lombok.Cleanup;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl.TopicsChangedListener;
@@ -41,6 +41,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.topics.TopicsPatternFactory;
 import org.mockito.ArgumentCaptor;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -67,7 +68,7 @@ public class TopicListWatcherTest {
         Timer timer = new HashedWheelTimer();
         when(client.timer()).thenReturn(timer);
         String topic = "persistent://tenant/ns/topic\\d+";
-        when(client.getConnection(topic, 0)).
+        when(client.getConnection(anyString(), anyInt())).
                 thenReturn(clientCnxFuture.thenApply(clientCnx -> 
Pair.of(clientCnx, false)));
         when(client.getConnection(any(), any(), 
anyInt())).thenReturn(clientCnxFuture);
         when(connectionPool.getConnection(any(), any(), 
anyInt())).thenReturn(clientCnxFuture);
@@ -82,7 +83,7 @@ public class TopicListWatcherTest {
 
         watcherFuture = new CompletableFuture<>();
         watcher = new TopicListWatcher(queue, client,
-                Pattern.compile(topic), 7,
+                TopicsPatternFactory.create(Pattern.compile(topic)), 7,
                 NamespaceName.get("tenant/ns"), null, watcherFuture, () -> {});
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java
index 26a295264fc..00390720dbb 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.common.lookup;
 
-import com.google.re2j.Pattern;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -30,6 +29,7 @@ import 
org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.topics.TopicsPattern;
 
 /***
  * A value object.
@@ -123,7 +123,7 @@ public class GetTopicsResult {
         }
     }
 
-    public GetTopicsResult filterTopics(Pattern topicsPattern) {
+    public GetTopicsResult filterTopics(TopicsPattern topicsPattern) {
         List<String> topicsFiltered = TopicList.filterTopics(getTopics(), 
topicsPattern);
         // If nothing changed.
         if (topicsFiltered.equals(getTopics())) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/JDKTopicsPattern.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/JDKTopicsPattern.java
new file mode 100644
index 00000000000..10313b59474
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/JDKTopicsPattern.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.topics;
+
+import java.util.regex.Pattern;
+
+class JDKTopicsPattern implements TopicsPattern {
+    private final String inputPattern;
+    private final Pattern pattern;
+
+    public JDKTopicsPattern(String inputPattern, String 
regexWithoutTopicDomainScheme) {
+        this.inputPattern = inputPattern;
+        this.pattern = Pattern.compile(regexWithoutTopicDomainScheme);
+    }
+
+    public JDKTopicsPattern(Pattern pattern) {
+        this.pattern = pattern;
+        this.inputPattern = pattern.pattern();
+    }
+
+    @Override
+    public boolean matches(String topicName) {
+        return pattern.matcher(topicName).matches();
+    }
+
+    @Override
+    public String inputPattern() {
+        return inputPattern;
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/MatchAllTopicsPattern.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/MatchAllTopicsPattern.java
new file mode 100644
index 00000000000..39de3eda925
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/MatchAllTopicsPattern.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.topics;
+
+import static org.apache.pulsar.common.topics.TopicList.ALL_TOPICS_PATTERN;
+
+class MatchAllTopicsPattern implements TopicsPattern {
+    public static final MatchAllTopicsPattern INSTANCE = new 
MatchAllTopicsPattern();
+
+    private MatchAllTopicsPattern() {
+        // Private constructor to prevent instantiation
+    }
+
+    public boolean matches(String topicName) {
+        return true; // Matches all topic names
+    }
+
+    @Override
+    public String inputPattern() {
+        return ALL_TOPICS_PATTERN;
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/RE2JTopicsPattern.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/RE2JTopicsPattern.java
new file mode 100644
index 00000000000..4e3384d58b8
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/RE2JTopicsPattern.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.topics;
+
+import com.google.re2j.Pattern;
+
+class RE2JTopicsPattern implements TopicsPattern {
+    private final String inputPattern;
+    private final Pattern pattern;
+
+    public RE2JTopicsPattern(String inputPattern, String 
regexWithoutTopicDomainScheme) {
+        this.inputPattern = inputPattern;
+        this.pattern = Pattern.compile(regexWithoutTopicDomainScheme);
+    }
+
+    @Override
+    public boolean matches(String topicName) {
+        return pattern.matcher(topicName).matches();
+    }
+
+    @Override
+    public String inputPattern() {
+        return inputPattern;
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index b3dcc7d09b7..629aedb3e72 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -35,31 +35,33 @@ import org.apache.pulsar.common.naming.TopicName;
 
 @UtilityClass
 public class TopicList {
-
     public static final String ALL_TOPICS_PATTERN = ".*";
 
     private static final String SCHEME_SEPARATOR = "://";
 
     private static final Pattern SCHEME_SEPARATOR_PATTERN = 
Pattern.compile(Pattern.quote(SCHEME_SEPARATOR));
 
+    public static List<String> filterTopics(List<String> original, 
java.util.regex.Pattern jdkPattern) {
+        return filterTopics(original, TopicsPatternFactory.create(jdkPattern));
+    }
+
     // get topics that match 'topicsPattern' from original topics list
     // return result should contain only topic names, without partition part
-    public static List<String> filterTopics(List<String> original, String 
regex) {
-        Pattern topicsPattern = Pattern.compile(regex);
-        return filterTopics(original, topicsPattern);
+    public static List<String> filterTopics(List<String> original, String 
regex,
+                                            TopicsPattern.RegexImplementation 
topicsPatternImplementation) {
+        return filterTopics(original, TopicsPatternFactory.create(regex, 
topicsPatternImplementation));
     }
 
-    public static List<String> filterTopics(List<String> original, Pattern 
topicsPattern) {
-
-
-        final Pattern shortenedTopicsPattern = 
Pattern.compile(removeTopicDomainScheme(topicsPattern.toString()));
-
+    /**
+     * Filter topics using a TopicListPattern instance.
+     */
+    public static List<String> filterTopics(List<String> original, 
TopicsPattern topicsPattern) {
         return original.stream()
                 .map(TopicName::get)
                 .filter(topicName -> {
                     String partitionedTopicName = 
topicName.getPartitionedTopicName();
                     String removedScheme = 
SCHEME_SEPARATOR_PATTERN.split(partitionedTopicName)[1];
-                    return 
shortenedTopicsPattern.matcher(removedScheme).matches();
+                    return topicsPattern.matches(removedScheme);
                 })
                 .map(TopicName::toString)
                 .collect(Collectors.toList());
@@ -108,6 +110,6 @@ public class TopicList {
         } else {
             throw new IllegalArgumentException("Does not support topic domain: 
" + prefix);
         }
-        return String.format("%s%s", prefix, removedTopicDomain);
+        return prefix + removedTopicDomain;
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPattern.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPattern.java
new file mode 100644
index 00000000000..483443d6492
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPattern.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.topics;
+
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * Interface for matching topic names against a pattern.
+ * Implementations can use different regex libraries such as RE2J or standard 
JDK.
+ * There's also an option to use RE2J with JDK fallback.
+ */
+public interface TopicsPattern {
+    /**
+     * The regex implementation type used by the TopicsPattern.
+     * RE2J is a fast regex engine that is suitable for high-performance 
applications.
+     * JDK uses the standard Java regex engine.
+     * RE2J_WITH_JDK_FALLBACK uses RE2J but falls back to JDK if RE2J fails to 
compile the pattern.
+     */
+    enum RegexImplementation {
+        RE2J,
+        JDK,
+        RE2J_WITH_JDK_FALLBACK
+    }
+
+    /**
+     * Evaluates the pattern for the given topic name which is expected to be
+     * passed without the domain scheme prefix in the format 
"tenant/namespace/topic".
+     *
+     * @param topicNameWithoutDomainSchemePrefix the topic name to match
+     * @return true if the topic matches the pattern, false otherwise
+     */
+    boolean matches(String topicNameWithoutDomainSchemePrefix);
+
+    /**
+     * Returns the original regex pattern used by this TopicsPattern passed as 
input.
+     * The internal implementation modifies the pattern to remove the possible 
topic domain scheme
+     * (e.g., "persistent://") since in matching the topic name, the domain 
scheme is ignored.
+     *
+     * @return the regex pattern as a string
+     */
+    String inputPattern();
+
+    /**
+     * Returns the namespace associated with this TopicsPattern.
+     * This is typically used to determine the namespace context for the 
pattern.
+     *
+     * @return the NamespaceName associated with this TopicsPattern
+     */
+    default NamespaceName namespace() {
+        return TopicName.get(inputPattern()).getNamespaceObject();
+    }
+
+    /**
+     * The Topic watcher instance will be created on the broker based on a 
lookup for the topic name
+     * returned by this method. The placement reuses topic lookup so that the 
Pulsar load balancer can
+     * place the topic watcher on different brokers based on load without 
having to implement another load balancing
+     * solution for topic watchers.
+     *
+     * @return the topic lookup name for topic watcher placement
+     */
+    default String topicLookupNameForTopicListWatcherPlacement() {
+        // By default, return the pattern itself since it is sufficient for a 
topic lookup.
+        // Currently Pulsar doesn't limit the characters used in the topic 
name part of the pattern, but this would
+        // be a good place to apply any sanitization if needed in the future.
+        return inputPattern();
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java
new file mode 100644
index 00000000000..50434543971
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.topics;
+
+import java.util.regex.Pattern;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Factory class for creating instances of TopicsPattern based on different 
regex implementations.
+ * It supports JDK regex, RE2J regex, and a fallback mechanism for RE2J with 
JDK.
+ */
+@UtilityClass
+@Slf4j
+public class TopicsPatternFactory {
+    /**
+     * Creates a TopicsPattern from a JDK Pattern.
+     * If the pattern contains the topic domain scheme, it will be removed and 
a RE2J_WITH_JDK_FALLBACK implementation
+     * will be used. If the pattern does not contain the topic domain scheme, 
the input JDK Pattern instance will be
+     * used to evaluate topic name matches.
+     * The Pulsar Java Client will use this method to create a TopicsPattern 
from a JDK Pattern.
+     *
+     * @param jdkPattern the JDK Pattern to create the TopicsPattern from
+     * @return a TopicsPattern instance
+     */
+    public static TopicsPattern create(Pattern jdkPattern) {
+        String currentRegex = jdkPattern.pattern();
+        String removedTopicDomainScheme = 
TopicList.removeTopicDomainScheme(currentRegex);
+        // If the regex pattern is already without the topic domain scheme, we 
can directly create a JDKTopicsPattern
+        if (currentRegex.equals(removedTopicDomainScheme)) {
+            return new JDKTopicsPattern(jdkPattern);
+        } else {
+            // If the regex pattern contains the topic domain scheme, we 
remove it and create a TopicsPattern
+            // using RE2J_WITH_JDK_FALLBACK
+            return internalCreate(currentRegex, removedTopicDomainScheme,
+                    TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK);
+        }
+    }
+
+    /**
+     * Creates a TopicsPattern from a regex string, using the specified regex 
implementation.
+     * The Pulsar Broker will use this method to create a TopicsPattern from a 
regex string. The implementation
+     * is configured in the broker configuration file with the 
`topicsPatternRegexImplementation` property.
+     *
+     * @param regex the regex string to create the TopicsPattern from
+     * @param implementation the regex implementation to use (RE2J, JDK, or 
RE2J_WITH_JDK_FALLBACK)
+     * @return a TopicsPattern instance
+     */
+    public static TopicsPattern create(String regex, 
TopicsPattern.RegexImplementation implementation) {
+        return internalCreate(regex, TopicList.removeTopicDomainScheme(regex), 
implementation);
+    }
+
+    /**
+     * Creates a TopicsPattern that matches all topics.
+     *
+     * @return a TopicsPattern instance that matches all topics
+     */
+    public static TopicsPattern matchAll() {
+        return MatchAllTopicsPattern.INSTANCE;
+    }
+
+    private static TopicsPattern internalCreate(String inputPattern, String 
regexWithoutTopicDomainScheme,
+                                                
TopicsPattern.RegexImplementation implementation) {
+        if 
(TopicList.ALL_TOPICS_PATTERN.equals(regexWithoutTopicDomainScheme)) {
+            return matchAll();
+        }
+        switch (implementation) {
+            case RE2J:
+                return new RE2JTopicsPattern(inputPattern, 
regexWithoutTopicDomainScheme);
+            case JDK:
+                return new JDKTopicsPattern(inputPattern, 
regexWithoutTopicDomainScheme);
+            case RE2J_WITH_JDK_FALLBACK:
+                try {
+                    return new RE2JTopicsPattern(inputPattern, 
regexWithoutTopicDomainScheme);
+                } catch (com.google.re2j.PatternSyntaxException e) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Failed to compile regex pattern '{}' with 
RE2J, fallback to JDK",
+                                regexWithoutTopicDomainScheme, e);
+                    }
+                    // Fallback to JDK implementation if RE2J fails
+                    return new JDKTopicsPattern(inputPattern, 
regexWithoutTopicDomainScheme);
+                }
+            default:
+                throw new IllegalArgumentException("Unknown 
RegexImplementation: " + implementation);
+        }
+    }
+}
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
index a62aa012c7f..ee14f345029 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
@@ -18,17 +18,17 @@
  */
 package org.apache.pulsar.common.topics;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
-import com.google.re2j.Pattern;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Stream;
+import java.util.regex.Pattern;
 import org.testng.annotations.Test;
 
 public class TopicListTest {
@@ -44,12 +44,11 @@ public class TopicListTest {
 
         Pattern pattern1 = 
Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
         List<String> result1 = TopicList.filterTopics(topicsNames, pattern1);
-        assertTrue(result1.size() == 2 && result1.contains(topicName1) && 
result1.contains(topicName2));
+        assertThat(result1).containsExactly(topicName1, topicName2);
 
         Pattern pattern2 = 
Pattern.compile("persistent://my-property/my-ns/.*");
         List<String> result2 = TopicList.filterTopics(topicsNames, pattern2);
-        assertTrue(result2.size() == 4
-                && Stream.of(topicName1, topicName2, topicName3, 
topicName4).allMatch(result2::contains));
+        assertThat(result2).containsExactly(topicName1, topicName2, 
topicName3, topicName4);
     }
 
     @Test
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicsPatternFactoryTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicsPatternFactoryTest.java
new file mode 100644
index 00000000000..c244c0bc1b3
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicsPatternFactoryTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.topics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import com.google.re2j.PatternSyntaxException;
+import java.util.regex.Pattern;
+import org.testng.annotations.Test;
+
+public class TopicsPatternFactoryTest {
+    public static final String DEFAULT_TOPICS_PATTERN = 
"tenant/namespace/test-topic-.*";
+    public static final String DEFAULT_TOPICS_PATTERN_WITH_DOMAIN = 
"persistent://" + DEFAULT_TOPICS_PATTERN;
+
+    @Test
+    public void testCreateWithJdkPattern() {
+        Pattern jdkPattern = Pattern.compile(DEFAULT_TOPICS_PATTERN);
+        TopicsPattern topicsPattern = TopicsPatternFactory.create(jdkPattern);
+        assertTopicsPattern(topicsPattern, false);
+    }
+
+    private static void assertTopicsPattern(TopicsPattern topicsPattern, 
boolean inputIncludesDomainScheme) {
+        assertNotNull(topicsPattern);
+        assertEquals(topicsPattern.inputPattern(),
+                inputIncludesDomainScheme ? DEFAULT_TOPICS_PATTERN_WITH_DOMAIN 
: DEFAULT_TOPICS_PATTERN);
+        assertTopicsPatternMatches(topicsPattern);
+    }
+
+    private static void assertTopicsPatternMatches(TopicsPattern 
topicsPattern) {
+        assertTrue(topicsPattern.matches("tenant/namespace/test-topic-1"));
+        assertTrue(topicsPattern.matches("tenant/namespace/test-topic-abc"));
+        assertFalse(topicsPattern.matches("tenant/namespace/other-topic"));
+    }
+
+    @Test
+    public void testCreateWithJdkPatternContainingTopicDomainScheme() {
+        Pattern jdkPattern = 
Pattern.compile(DEFAULT_TOPICS_PATTERN_WITH_DOMAIN);
+        TopicsPattern topicsPattern = TopicsPatternFactory.create(jdkPattern);
+        assertTopicsPattern(topicsPattern, true);
+    }
+
+    @Test
+    public void testCreateWithStringAndRE2JImplementation() {
+        TopicsPattern topicsPattern = 
TopicsPatternFactory.create(DEFAULT_TOPICS_PATTERN,
+                TopicsPattern.RegexImplementation.RE2J);
+        assertTopicsPattern(topicsPattern, false);
+    }
+
+    @Test
+    public void testCreateWithStringAndJDKImplementation() {
+        TopicsPattern topicsPattern = 
TopicsPatternFactory.create(DEFAULT_TOPICS_PATTERN,
+                TopicsPattern.RegexImplementation.JDK);
+        assertTopicsPattern(topicsPattern, false);
+    }
+
+    @Test
+    public void testCreateWithStringAndRE2JWithJDKFallbackImplementation() {
+        TopicsPattern topicsPattern = 
TopicsPatternFactory.create(DEFAULT_TOPICS_PATTERN,
+                TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK);
+        assertTopicsPattern(topicsPattern, false);
+    }
+
+    @Test
+    public void testCreateWithStringContainingTopicDomainScheme() {
+        TopicsPattern topicsPattern = 
TopicsPatternFactory.create(DEFAULT_TOPICS_PATTERN_WITH_DOMAIN,
+                TopicsPattern.RegexImplementation.JDK);
+        assertTopicsPattern(topicsPattern, true);
+    }
+
+    @Test
+    public void testCreateWithAllTopicsPattern() {
+        TopicsPattern topicsPattern = TopicsPatternFactory.create(".*",
+                TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK);
+        assertNotNull(topicsPattern);
+        assertTrue(topicsPattern.matches("tenant/namespace/any-topic"));
+        assertTrue(topicsPattern.matches("tenant/namespace/test-topic-1"));
+        assertTrue(topicsPattern.matches(""));
+    }
+
+    @Test
+    public void testMatchAll() {
+        TopicsPattern topicsPattern = TopicsPatternFactory.matchAll();
+        assertNotNull(topicsPattern);
+        assertTrue(topicsPattern.matches("tenant/namespace/any-topic"));
+        assertTrue(topicsPattern.matches("tenant/namespace/test-topic-1"));
+        assertTrue(topicsPattern.matches(""));
+        
assertTrue(topicsPattern.matches("tenant/namespace/very-long-topic-name"));
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testCreateWithInvalidImplementation() {
+        TopicsPatternFactory.create(null, 
TopicsPattern.RegexImplementation.JDK);
+    }
+
+    @Test
+    public void testRE2JFallbackToJDK() {
+        // example of excluding topics that start with "test2-" or "test3-" or 
"other-", but require that the topic
+        // name ends with "-topic" or "-topic-" and any suffix
+        // This regex cannot be parsed by RE2J, so it should fall back to JDK 
regex
+        String complexRegex = 
"tenant/namespace/(?!(test2|test3|other)-).+-topic(-.*)?";
+        TopicsPattern topicsPattern = TopicsPatternFactory.create(complexRegex,
+                TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK);
+        assertNotNull(topicsPattern);
+        assertTopicsPatternMatches(topicsPattern);
+        assertTrue(topicsPattern.matches("tenant/namespace/any-topic"));
+        assertFalse(topicsPattern.matches("tenant/namespace/test2-topic"));
+        assertFalse(topicsPattern.matches("tenant/namespace/test3-topic"));
+        assertTrue(topicsPattern.matches("tenant/namespace/test4-topic"));
+        assertTrue(topicsPattern.matches("tenant/namespace/test20-topic"));
+    }
+
+    @Test(expectedExceptions = PatternSyntaxException.class)
+    public void testRE2JParsingFailure() {
+        String complexRegex = 
"tenant/namespace/(?!(test2|test3|other)-).+-topic(-.*)?";
+        TopicsPattern topicsPattern = TopicsPatternFactory.create(complexRegex,
+                TopicsPattern.RegexImplementation.RE2J);
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicsPatternTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicsPatternTest.java
new file mode 100644
index 00000000000..68e54cc2a29
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicsPatternTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.topics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.util.regex.Pattern;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.Test;
+
+public class TopicsPatternTest {
+
+    @Test
+    public void testNamespace() {
+        TopicsPattern pattern = 
TopicsPatternFactory.create(Pattern.compile("tenant/namespace/topic-.*"));
+        NamespaceName namespace = pattern.namespace();
+        assertEquals(namespace.getTenant(), "tenant");
+        assertEquals(namespace.getLocalName(), "namespace");
+        assertEquals(namespace.toString(), "tenant/namespace");
+
+        // Test with a standard topic pattern
+        TopicsPattern domainPrefixPattern =
+                
TopicsPatternFactory.create(Pattern.compile("persistent://tenant/namespace/topic-.*"));
+        NamespaceName namespace2 = pattern.namespace();
+        assertEquals(namespace.getTenant(), "tenant");
+        assertEquals(namespace.getLocalName(), "namespace");
+        assertEquals(namespace.toString(), "tenant/namespace");
+
+        // Test with a more complex topic pattern
+        TopicsPattern complexPattern =
+                
TopicsPatternFactory.create(Pattern.compile("persistent://my-tenant/my-namespace/prefix-.*-suffix"));
+        NamespaceName complexNamespace = complexPattern.namespace();
+        assertEquals(complexNamespace.getTenant(), "my-tenant");
+        assertEquals(complexNamespace.getLocalName(), "my-namespace");
+        assertEquals(complexNamespace.toString(), "my-tenant/my-namespace");
+
+        // Test with non-persistent topic pattern
+        TopicsPattern nonPersistentPattern =
+                
TopicsPatternFactory.create(Pattern.compile("non-persistent://test-tenant/test-namespace/.*"));
+        NamespaceName nonPersistentNamespace = 
nonPersistentPattern.namespace();
+        assertEquals(nonPersistentNamespace.getTenant(), "test-tenant");
+        assertEquals(nonPersistentNamespace.getLocalName(), "test-namespace");
+        assertEquals(nonPersistentNamespace.toString(), 
"test-tenant/test-namespace");
+    }
+
+    @Test
+    public void testTopicLookupNameForTopicListWatcherPlacement() {
+        TopicsPattern pattern =
+                
TopicsPatternFactory.create(Pattern.compile("persistent://tenant/namespace/topic-.*(\\d+)?"));
+        String lookupName = 
pattern.topicLookupNameForTopicListWatcherPlacement();
+        assertNotNull(lookupName);
+        TopicName lookupTopicName = TopicName.get(lookupName);
+        assertEquals(lookupTopicName.getTenant(), "tenant");
+        assertEquals(lookupTopicName.getNamespacePortion(), "namespace");
+        assertEquals(lookupTopicName.getLocalName(), "topic-.*(\\d+)?");
+    }
+}
\ No newline at end of file

Reply via email to