This is an automated email from the ASF dual-hosted git repository.

BewareMyPower pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8652efa4d60 [feat][broker] PIP-469: Legacy-aware topic policies 
backend routing and metadata-store topic policies (#25707)
8652efa4d60 is described below

commit 8652efa4d60b791a5f1ee4e52f7ffda6ebbbb256
Author: Yunze Xu <[email protected]>
AuthorDate: Mon May 18 19:16:47 2026 +0800

    [feat][broker] PIP-469: Legacy-aware topic policies backend routing and 
metadata-store topic policies (#25707)
---
 pip/pip-469.md                                     |  37 ++-
 .../apache/pulsar/broker/ServiceConfiguration.java |  12 +-
 .../org/apache/pulsar/broker/PulsarService.java    |  11 +-
 .../pulsar/broker/service/AbstractTopic.java       |   2 +-
 .../service/LegacyAwareTopicPoliciesService.java   | 144 +++++++++++
 .../service/MetadataStoreTopicPoliciesService.java | 280 +++++++++++++++++++++
 .../SystemTopicBasedTopicPoliciesService.java      |   2 +-
 .../broker/service/TopicPoliciesService.java       |  15 +-
 .../broker/service/persistent/PersistentTopic.java |   8 +-
 .../admin/MetadataStoreTopicPoliciesTest.java      |  72 ++++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 114 ++++++---
 .../LegacyAwareTopicPoliciesServiceTest.java       | 190 ++++++++++++++
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |   4 +-
 .../broker/service/TopicPolicyTestUtils.java       |   7 +
 14 files changed, 852 insertions(+), 46 deletions(-)

diff --git a/pip/pip-469.md b/pip/pip-469.md
index 69036381f0f..4734adc127d 100644
--- a/pip/pip-469.md
+++ b/pip/pip-469.md
@@ -105,8 +105,15 @@ Broker startup validates both backends:
 - `SystemTopicBasedTopicPoliciesService` must be instantiable.
 - The configured `topicPoliciesServiceClassName` must be instantiable.
 
-If either backend cannot be instantiated or started, broker startup fails. 
There is no per-request fallback from one
-backend to another.
+`LegacyAwareTopicPoliciesService#start` starts only the configured backend. It 
intentionally does not call
+`SystemTopicBasedTopicPoliciesService#start`, because that start path 
registers a namespace-bundle ownership listener
+whose only purpose is to eagerly create a reader on 
`<namespace>/__change_events` when a namespace bundle is loaded.
+Under legacy-aware routing, that eager optimization would be counterproductive 
because it can create readers for
+namespaces that do not have topic policies in `__change_events`. For legacy 
namespaces, the system-topic reader and
+policy cache are initialized lazily by the routed system-topic backend 
operations.
+
+If either backend cannot be instantiated, or if the configured backend cannot 
be started, broker startup fails. There is
+no per-request fallback from one backend to another.
 
 ### Namespace-scoped service routing
 
@@ -118,6 +125,10 @@ backend to another.
   the system-topic backend when the system topic exists.
 - Routing the same operations to the configured backend when the system topic 
does not exist.
 
+Listener registration is routed through 
`TopicPoliciesService#registerListenerAsync`. This lets the wrapper resolve the
+namespace backend before registering the listener, and the listener is 
registered only on the selected backend instead
+of being registered on both backends.
+
 The system-topic existence check can be cached per namespace in memory, but 
the routing rule is defined by actual topic
 existence rather than by new namespace metadata.
 
@@ -137,9 +148,13 @@ meaning the system-topic-backed topic-policies state is 
gone.
 
 - Topic names are normalized to the partitioned topic name, so all partitions 
share the same topic-policies record.
 - Global policies are stored in the configuration metadata store path:
-  `/admin/topic-policies/{tenant}/{namespace}/{domain}/{encodedTopic}`.
+  `/admin/topic-policies/global/{tenant}/{namespace}/{domain}/{encodedTopic}`.
 - Local policies are stored in the local metadata store path:
-  
`/admin/local-policies/topic-policies/{tenant}/{namespace}/{domain}/{encodedTopic}`.
+  `/admin/topic-policies/local/{tenant}/{namespace}/{domain}/{encodedTopic}`.
+
+To avoid possible conflicts like the listener registered on the 
`/admin/local-policies` path from
+`BrokerService#handleMetadataChanges`, these two paths share the same root 
path `/admin/topic-policies`, which is not
+used by any other component.
 
 Each node stores a serialized `TopicPolicies` document. The backend writes and 
reads the two scopes independently:
 
@@ -159,6 +174,11 @@ managed-ledger metadata updates.
 
 ### Listener behavior
 
+`TopicPoliciesService` adds `registerListenerAsync(TopicName, 
TopicPolicyListener)` for listener registration. The
+existing synchronous `registerListener(TopicName, TopicPolicyListener)` method 
is retained as a deprecated compatibility
+hook for existing custom implementations, and the default async method 
delegates to it. Implementations that need async
+routing or initialization, such as `LegacyAwareTopicPoliciesService`, override 
`registerListenerAsync` directly.
+
 The backend registers watchers on both metadata stores:
 
 - A change on the local path re-reads the local node and notifies listeners 
with the latest local `TopicPolicies` or
@@ -173,6 +193,11 @@ append-only replay log; it relies on metadata-store 
notifications and read-after
 
 ### Public API
 
+The `TopicPoliciesService` extension point gains a default
+`CompletableFuture<Boolean> registerListenerAsync(TopicName, 
TopicPolicyListener)` method. Existing implementations
+remain compatible because `registerListener(TopicName, TopicPolicyListener)` 
is retained and used by the default async
+implementation.
+
 No new namespace policy field is introduced.
 
 No new namespace admin REST endpoint or Java admin client method is introduced.
@@ -221,6 +246,10 @@ This upgrade rule is intentionally conservative:
 This means some namespaces with an empty but already-created `__change_events` 
topic may continue using the
 system-topic backend. That is acceptable because it avoids missing legacy 
state.
 
+Existing custom `TopicPoliciesService` implementations that only implement the 
synchronous `registerListener` method
+continue to work through the default `registerListenerAsync` bridge. 
Implementations can override
+`registerListenerAsync` when registration itself needs asynchronous backend 
resolution or initialization.
+
 ## Downgrade / Rollback
 
 Rolling back to a broker version that does not understand legacy-aware routing 
returns topic-policies backend
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4c3651bf90b..5707c05af1b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1750,8 +1750,16 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 
     @FieldContext(
             category = CATEGORY_SERVER,
-            doc = "The class name of the topic policies service. The default 
config only takes affect when the "
-                    + "systemTopicEnable config is true"
+            doc = """
+                    The class name of the topic policies service. There are 2 
built-in implementations:
+                    1. 
"org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService" 
(default)
+                      It stores a topic's policies in the `__change_events` 
topic. If `systemTopicEnabled` is false,
+                      the topic policies will just be disabled
+                    2. 
"org.apache.pulsar.broker.service.MetadataStoreTopicPoliciesService"
+                      It stores a topic's policies in the metadata store. If 
`systemTopicEnabled` is true and the
+                      topic's namespace has a `__change_events` topic, the 
policies will still be stored in the
+                      `__change_events` topic for backward compatibility.
+                    """
     )
     private String topicPoliciesServiceClassName =
             
"org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService";
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 4fe6196c05c..b200284797f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -107,6 +107,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.rest.Topics;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.HealthChecker;
+import org.apache.pulsar.broker.service.LegacyAwareTopicPoliciesService;
 import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
 import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
@@ -2293,8 +2294,16 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 return TopicPoliciesService.DISABLED;
             }
         }
-        return (TopicPoliciesService) Reflections.createInstance(className,
+        final var configuredService = (TopicPoliciesService) 
Reflections.createInstance(className,
                 Thread.currentThread().getContextClassLoader());
+        if (!config.isSystemTopicEnabled()) {
+            log.info()
+                    .attr("className", className)
+                    .log("System topic is disabled, using configured topic 
policies service without legacy routing");
+            return configuredService;
+        }
+        return new LegacyAwareTopicPoliciesService(this, new 
SystemTopicBasedTopicPoliciesService(this),
+                configuredService);
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4d29c751f03..3ad9eb43810 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -559,7 +559,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
 
     protected void registerTopicPolicyListener() {
         brokerService.getPulsar().getTopicPoliciesService()
-                .registerListener(TopicName.getPartitionedTopicName(topic), 
this);
+                
.registerListenerAsync(TopicName.getPartitionedTopicName(topic), this);
     }
 
     protected void unregisterTopicPolicyListener() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesService.java
new file mode 100644
index 00000000000..20f7b207991
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesService.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import lombok.CustomLog;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.jspecify.annotations.NonNull;
+
+/**
+ * Routes topic policy operations to the legacy system-topic backend when a 
namespace already has
+ * a topic-policy {@code __change_events} system topic, and otherwise to the 
configured backend.
+ */
+@CustomLog
+public class LegacyAwareTopicPoliciesService implements TopicPoliciesService {
+
+    private final AsyncLoadingCache<NamespaceName, Boolean> isLegacyNamespace;
+    @VisibleForTesting
+    final SystemTopicBasedTopicPoliciesService systemTopicService;
+    private final TopicPoliciesService configuredService;
+
+    public LegacyAwareTopicPoliciesService(PulsarService pulsar,
+                                           
SystemTopicBasedTopicPoliciesService systemTopicService,
+                                           TopicPoliciesService 
configuredService) {
+        // Generally, we only need to check if the __change_events topic 
exists once because the __change_events topic
+        // should only be created by broker before the upgrade, where 
`SystemTopicBasedTopicPoliciesService` is
+        // configured as the topic policies service.
+        this.isLegacyNamespace = 
Caffeine.newBuilder().expireAfterWrite(Duration.ofHours(1))
+                .buildAsync(new AsyncCacheLoader<>() {
+                    @NonNull
+                    @Override
+                    public CompletableFuture<? extends Boolean> 
asyncLoad(NamespaceName key,
+                                                                          
@NonNull Executor executor) {
+                        return 
NamespaceEventsSystemTopicFactory.checkSystemTopicExists(key, 
EventType.TOPIC_POLICY,
+                                pulsar);
+                    }
+                });
+        this.systemTopicService = systemTopicService;
+        this.configuredService = configuredService;
+        if (configuredService instanceof SystemTopicBasedTopicPoliciesService) 
{
+            throw new IllegalArgumentException(
+                    "configuredService should not be an instance of 
SystemTopicBasedTopicPoliciesService");
+        }
+    }
+
+    @Override
+    public void start(PulsarService pulsarService) {
+        // We should not call `systemTopicService.start()`, which just 
registers a namespace bundle listener to create
+        // a reader on `<namespace>/__change_events` when the namespace's 
bundle is loaded firstly. It's just an
+        // optimization to create the reader before loading any topic. 
However, it could create a reader on a namespace
+        // that does not even have the __change_events topic.
+        configuredService.start(pulsarService);
+    }
+
+    @Override
+    public void close() throws Exception {
+        try {
+            configuredService.close();
+        } finally {
+            systemTopicService.close();
+        }
+    }
+
+    @Override
+    public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(TopicName topicName, GetType type) {
+        return resolveService(topicName.getNamespaceObject())
+                .thenCompose(service -> 
service.getTopicPoliciesAsync(topicName, type));
+    }
+
+    @Override
+    public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName 
topicName, boolean isGlobalPolicy,
+                                                            boolean 
skipUpdateWhenTopicPolicyDoesntExist,
+                                                            
Consumer<TopicPolicies> policyUpdater) {
+        return resolveService(topicName.getNamespaceObject())
+                .thenCompose(service -> 
service.updateTopicPoliciesAsync(topicName, isGlobalPolicy,
+                        skipUpdateWhenTopicPolicyDoesntExist, policyUpdater));
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName 
topicName) {
+        return resolveService(topicName.getNamespaceObject())
+                .thenCompose(service -> 
service.deleteTopicPoliciesAsync(topicName));
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName 
topicName,
+                                                            boolean 
keepGlobalPoliciesAfterDeleting) {
+        return resolveService(topicName.getNamespaceObject())
+                .thenCompose(service -> 
service.deleteTopicPoliciesAsync(topicName,
+                        keepGlobalPoliciesAfterDeleting));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> registerListenerAsync(TopicName 
topicName, TopicPolicyListener listener) {
+        return resolveService(topicName.getNamespaceObject())
+                .thenCompose(service -> 
service.registerListenerAsync(topicName, listener));
+    }
+
+    @Override
+    public boolean registerListener(TopicName topicName, TopicPolicyListener 
listener) {
+        throw new RuntimeException("should not be called");
+    }
+
+    @Override
+    public void unregisterListener(TopicName topicName, TopicPolicyListener 
listener) {
+        configuredService.unregisterListener(topicName, listener);
+        systemTopicService.unregisterListener(topicName, listener);
+    }
+
+    @VisibleForTesting
+    CompletableFuture<TopicPoliciesService> resolveService(NamespaceName 
namespace) {
+        return isLegacyNamespace.get(namespace)
+                .thenApply(isLegacy -> isLegacy ? systemTopicService : 
configuredService);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MetadataStoreTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MetadataStoreTopicPoliciesService.java
new file mode 100644
index 00000000000..56319a44ac3
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MetadataStoreTopicPoliciesService.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import lombok.CustomLog;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.jspecify.annotations.Nullable;
+
+/**
+ * Topic policies service backed by Pulsar metadata stores.
+ */
+@CustomLog
+public class MetadataStoreTopicPoliciesService implements TopicPoliciesService 
{
+
+    public static final String GLOBAL_POLICIES_ROOT = 
"/admin/topic-policies/global";
+    public static final String LOCAL_POLICIES_ROOT = 
"/admin/topic-policies/local";
+
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Map<TopicName, List<TopicPolicyListener>> listeners = new 
ConcurrentHashMap<>();
+    private MetadataCache<TopicPolicies> localPoliciesCache;
+    private MetadataCache<TopicPolicies> globalPoliciesCache;
+
+    @Override
+    public void start(PulsarService pulsar) {
+        MetadataStore localStore = pulsar.getLocalMetadataStore();
+        MetadataStore configurationStore = 
pulsar.getConfigurationMetadataStore();
+        this.localPoliciesCache = 
localStore.getMetadataCache(TopicPolicies.class);
+        this.globalPoliciesCache = 
configurationStore.getMetadataCache(TopicPolicies.class);
+        localStore.registerListener(notification -> 
handleNotification(notification, false));
+        configurationStore.registerListener(notification -> 
handleNotification(notification, true));
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName 
topicName) {
+        return deleteTopicPoliciesAsync(topicName, false);
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName 
topicName,
+                                                            boolean 
keepGlobalPoliciesAfterDeleting) {
+        TopicName partitionedTopicName = normalizeTopicName(topicName);
+        if 
(NamespaceService.isHeartbeatNamespace(partitionedTopicName.getNamespaceObject()))
 {
+            return CompletableFuture.completedFuture(null);
+        }
+        if (closed.get()) {
+            return CompletableFuture.failedFuture(new 
BrokerServiceException(getClass().getName() + " is closed."));
+        }
+        CompletableFuture<Void> deleteLocal =
+                deleteIfExists(localPoliciesCache, 
pathFor(partitionedTopicName, false));
+        if (keepGlobalPoliciesAfterDeleting) {
+            return deleteLocal;
+        }
+        CompletableFuture<Void> deleteGlobal =
+                deleteIfExists(globalPoliciesCache, 
pathFor(partitionedTopicName, true));
+        return CompletableFuture.allOf(deleteLocal, deleteGlobal);
+    }
+
+    @Override
+    public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName 
topicName, boolean isGlobalPolicy,
+                                                            boolean 
skipUpdateWhenTopicPolicyDoesntExist,
+                                                            
Consumer<TopicPolicies> policyUpdater) {
+        TopicName partitionedTopicName = normalizeTopicName(topicName);
+        if 
(NamespaceService.isHeartbeatNamespace(partitionedTopicName.getNamespaceObject()))
 {
+            return CompletableFuture.failedFuture(new 
BrokerServiceException.NotAllowedException(
+                    "Not allowed to update topic policy for the heartbeat 
topic"));
+        }
+        if (closed.get()) {
+            return CompletableFuture.failedFuture(new 
BrokerServiceException(getClass().getName() + " is closed."));
+        }
+        MetadataCache<TopicPolicies> cache = cache(isGlobalPolicy);
+        String path = pathFor(partitionedTopicName, isGlobalPolicy);
+        CompletableFuture<TopicPolicies> updateFuture;
+        if (skipUpdateWhenTopicPolicyDoesntExist) {
+            updateFuture = cache.readModifyUpdate(path,
+                    current -> updatePolicies(Optional.of(current), 
isGlobalPolicy, policyUpdater));
+        } else {
+            updateFuture = cache.readModifyUpdateOrCreate(path,
+                    current -> updatePolicies(current, isGlobalPolicy, 
policyUpdater));
+        }
+        return updateFuture.thenAccept(__ -> { }).exceptionally(error -> {
+            if (skipUpdateWhenTopicPolicyDoesntExist
+                    && FutureUtil.unwrapCompletionException(error) instanceof 
NotFoundException) {
+                return null;
+            }
+            throw FutureUtil.wrapToCompletionException(error);
+        });
+    }
+
+    @Override
+    public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(TopicName topicName, GetType type) {
+        TopicName partitionedTopicName = normalizeTopicName(topicName);
+        if 
(NamespaceService.isHeartbeatNamespace(partitionedTopicName.getNamespaceObject()))
 {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+        if (closed.get()) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+        boolean global = type == GetType.GLOBAL_ONLY;
+        return cache(global).get(pathFor(partitionedTopicName, global))
+                .thenApply(policies -> policies.map(policy -> 
cloneWithScope(policy, global)));
+    }
+
+    @Override
+    public boolean registerListener(TopicName topicName, TopicPolicyListener 
listener) {
+        listeners.compute(normalizeTopicName(topicName), (__, topicListeners) 
-> {
+            if (topicListeners == null) {
+                topicListeners = new CopyOnWriteArrayList<>();
+            }
+            topicListeners.add(listener);
+            return topicListeners;
+        });
+        return true;
+    }
+
+    @Override
+    public void unregisterListener(TopicName topicName, TopicPolicyListener 
listener) {
+        listeners.computeIfPresent(normalizeTopicName(topicName), (__, 
topicListeners) -> {
+            topicListeners.remove(listener);
+            return topicListeners.isEmpty() ? null : topicListeners;
+        });
+    }
+
+    @Override
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            listeners.clear();
+            if (localPoliciesCache != null) {
+                localPoliciesCache.invalidateAll();
+            }
+            if (globalPoliciesCache != null) {
+                globalPoliciesCache.invalidateAll();
+            }
+        }
+    }
+
+    private MetadataCache<TopicPolicies> cache(boolean isGlobalPolicy) {
+        return isGlobalPolicy ? globalPoliciesCache : localPoliciesCache;
+    }
+
+    private CompletableFuture<Void> 
deleteIfExists(MetadataCache<TopicPolicies> cache, String path) {
+        return cache.delete(path).handle((__, error) -> {
+            cache.invalidate(path);
+            if (error == null || FutureUtil.unwrapCompletionException(error) 
instanceof NotFoundException) {
+                return null;
+            }
+            throw FutureUtil.wrapToCompletionException(error);
+        });
+    }
+
+    private static TopicPolicies updatePolicies(Optional<TopicPolicies> 
currentPolicies,
+                                                boolean isGlobalPolicy,
+                                                Consumer<TopicPolicies> 
policyUpdater) {
+        TopicPolicies policies = 
currentPolicies.map(TopicPolicies::clone).orElseGet(TopicPolicies::new);
+        policies.setIsGlobal(isGlobalPolicy);
+        policyUpdater.accept(policies);
+        return policies;
+    }
+
+    private void handleNotification(Notification notification, boolean 
isGlobalPolicy) {
+        if (closed.get()
+                || (notification.getType() != NotificationType.Created
+                && notification.getType() != NotificationType.Modified
+                && notification.getType() != NotificationType.Deleted)) {
+            return;
+        }
+        String path = notification.getPath();
+        String root = isGlobalPolicy ? GLOBAL_POLICIES_ROOT : 
LOCAL_POLICIES_ROOT;
+        Optional<TopicName> topicName = topicNameFromPath(root, path);
+        if (topicName.isEmpty()) {
+            return;
+        }
+        MetadataCache<TopicPolicies> cache = cache(isGlobalPolicy);
+        cache.invalidate(path);
+        if (notification.getType() == NotificationType.Deleted) {
+            notifyListeners(topicName.get(), null);
+            return;
+        }
+        cache.get(path).whenComplete((policies, error) -> {
+            if (error != null) {
+                log.warn()
+                        .attr("path", path)
+                        .exception(error)
+                        .log("Failed to refresh topic policies after metadata 
notification");
+                return;
+            }
+            notifyListeners(topicName.get(),
+                    policies.map(policy -> cloneWithScope(policy, 
isGlobalPolicy)).orElse(null));
+        });
+    }
+
+    private void notifyListeners(TopicName topicName, @Nullable TopicPolicies 
policies) {
+        List<TopicPolicyListener> topicListeners = listeners.get(topicName);
+        if (topicListeners == null) {
+            return;
+        }
+        for (TopicPolicyListener listener : topicListeners) {
+            try {
+                listener.onUpdate(policies == null ? null : policies.clone());
+            } catch (Throwable error) {
+                log.error().attr("topic", 
topicName).exception(error).log("Call topic policy listener error");
+            }
+        }
+    }
+
+    private static TopicName normalizeTopicName(TopicName topicName) {
+        return TopicName.get(topicName.getPartitionedTopicName());
+    }
+
+    private static TopicPolicies cloneWithScope(TopicPolicies policies, 
boolean isGlobalPolicy) {
+        TopicPolicies cloned = policies.clone();
+        cloned.setIsGlobal(isGlobalPolicy);
+        return cloned;
+    }
+
+    @VisibleForTesting
+    public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesDirectFromStore(TopicName topicName,
+                                                                               
       boolean isGlobal) {
+        String path = pathFor(topicName, isGlobal);
+        MetadataCache<TopicPolicies> c = cache(isGlobal);
+        c.invalidate(path);
+        return c.get(path).thenApply(opt -> opt.map(p -> cloneWithScope(p, 
isGlobal)));
+    }
+
+    @VisibleForTesting
+    static String pathFor(TopicName topicName, boolean isGlobalPolicy) {
+        TopicName partitionedTopicName = normalizeTopicName(topicName);
+        return (isGlobalPolicy ? GLOBAL_POLICIES_ROOT : LOCAL_POLICIES_ROOT)
+                + "/" + partitionedTopicName.getTenant()
+                + "/" + partitionedTopicName.getNamespacePortion()
+                + "/" + partitionedTopicName.getDomain()
+                + "/" + partitionedTopicName.getEncodedLocalName();
+    }
+
+    @VisibleForTesting
+    private static Optional<TopicName> topicNameFromPath(String root, String 
path) {
+        if (!path.startsWith(root + "/")) {
+            return Optional.empty();
+        }
+        String[] parts = path.substring(root.length() + 1).split("/", 4);
+        if (parts.length != 4) {
+            return Optional.empty();
+        }
+        return Optional.of(TopicName.get(parts[2], parts[0], parts[1], 
Codec.decode(parts[3])));
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 48653883d1a..a8f37d0c389 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -667,7 +667,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         return systemTopicClient.newReaderAsync();
     }
 
-    private void removeOwnedNamespaceBundleAsync(NamespaceBundle 
namespaceBundle) {
+    void removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
         NamespaceName namespace = namespaceBundle.getNamespaceObject();
         if (NamespaceService.isHeartbeatNamespace(namespace)) {
             return;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 803a18da72d..5b5fe157230 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -96,6 +96,15 @@ public interface TopicPoliciesService extends AutoCloseable {
     default void close() throws Exception {
     }
 
+
+    /**
+     * @implNote This method is never called unless by the default 
implementation of
+     * {@link TopicPoliciesService#registerListenerAsync(TopicName, 
TopicPolicyListener)}, which is actually called
+     * internally. This method is only retained for backward compatibility on 
custom implementations.
+     */
+    @Deprecated
+    boolean registerListener(TopicName topicName, TopicPolicyListener 
listener);
+
     /**
      * Registers a listener for topic policies updates.
      *
@@ -106,10 +115,10 @@ public interface TopicPoliciesService extends 
AutoCloseable {
      * guaranteed to be received by the listener.
      * In summary, the listener is guaranteed to receive only the latest value.
      * </p>
-     *
-     * @return true if the listener is registered successfully
      */
-    boolean registerListener(TopicName topicName, TopicPolicyListener 
listener);
+    default CompletableFuture<Boolean> registerListenerAsync(TopicName 
topicName, TopicPolicyListener listener) {
+        return CompletableFuture.completedFuture(registerListener(topicName, 
listener));
+    }
 
     /**
      * Unregister the topic policies listener.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a4ef9a564fc..c9bcad341fc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4904,7 +4904,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     protected CompletableFuture<Void> initTopicPolicy() {
         final var topicPoliciesService = 
brokerService.pulsar().getTopicPoliciesService();
         final var partitionedTopicName = 
TopicName.getPartitionedTopicName(topic);
-        if (topicPoliciesService.registerListener(partitionedTopicName, this)) 
{
+        return 
topicPoliciesService.registerListenerAsync(partitionedTopicName, 
this).thenCompose(registered -> {
+            if (!registered) {
+                return CompletableFuture.completedFuture(null);
+            }
             if (ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
                 return CompletableFuture.completedFuture(null);
             }
@@ -4916,8 +4919,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     TopicPoliciesService.GetType.LOCAL_ONLY))
             .thenAcceptAsync(optionalPolicies -> 
optionalPolicies.ifPresent(this::onUpdate),
                             brokerService.getTopicOrderedExecutor());
-        }
-        return CompletableFuture.completedFuture(null);
+        });
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MetadataStoreTopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MetadataStoreTopicPoliciesTest.java
new file mode 100644
index 00000000000..e7fefa16497
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MetadataStoreTopicPoliciesTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import org.apache.pulsar.broker.service.MetadataStoreTopicPoliciesService;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+public class MetadataStoreTopicPoliciesTest extends TopicPoliciesTest {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        
conf.setTopicPoliciesServiceClassName(MetadataStoreTopicPoliciesService.class.getName());
+        super.setup();
+    }
+
+    @Override
+    protected void clearTopicPoliciesCache() {
+    }
+
+    @Test(enabled = false)
+    @Override
+    public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws 
Exception {
+        // This test is specific to SystemTopicBasedTopicPoliciesService (uses 
getPoliciesCacheInit).
+        // Not applicable to MetadataStoreTopicPoliciesService.
+    }
+
+    @Test(enabled = false)
+    @Override
+    public void testSystemTopicShouldBeCompacted() throws Exception {
+        // Relies on __change_events system topic, which does not exist with 
MetadataStoreTopicPoliciesService.
+    }
+
+    @Test(enabled = false)
+    @Override
+    public void testPoliciesCanBeDeletedWithTopic() throws Exception {
+        // Directly accesses __change_events PersistentTopic for compaction.
+        // Not applicable to MetadataStoreTopicPoliciesService.
+    }
+
+    @Test(enabled = false)
+    @Override
+    public void testProduceChangesWithEncryptionRequired() throws Exception {
+        // Checks __change_events LAC, which does not exist with 
MetadataStoreTopicPoliciesService.
+    }
+
+    @Test(enabled = false)
+    @Override
+    public void testTopicPoliciesAfterCompaction(String reloadPolicyType) 
throws Exception {
+        // The "Recreate_Service" variant creates a new 
SystemTopicBasedTopicPoliciesService,
+        // which is not applicable to MetadataStoreTopicPoliciesService.
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index d94aa51b73b..5ba979534ef 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -119,7 +119,9 @@ import org.glassfish.jersey.client.JerseyClient;
 import org.glassfish.jersey.client.JerseyClientBuilder;
 import org.mockito.Mockito;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -144,10 +146,11 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
     private final int testTopicPartitions = 2;
 
-    @BeforeMethod
+    @BeforeClass(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
         this.conf.setDefaultNumberOfNamespaceBundles(1);
+        this.conf.setForceDeleteNamespaceAllowed(true);
         super.internalSetup();
 
         admin.clusters().createCluster("test", 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
@@ -156,17 +159,48 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().createNamespace(testTenant + "/" + testNamespace, 
Set.of("test"));
         admin.namespaces().createNamespace(myNamespaceV1);
         admin.topics().createPartitionedTopic(testTopic, testTopicPartitions);
-        Producer<?> producer = 
pulsarClient.newProducer().topic(testTopic).create();
-        producer.close();
-        waitForZooKeeperWatchers();
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     @Override
     public void cleanup() throws Exception {
         super.internalCleanup();
     }
 
+    @BeforeMethod
+    void setupTestTopic() throws Exception {
+        // Recreate namespace to clear any policies set by previous tests
+        try {
+            admin.topics().deletePartitionedTopic(testTopic, true);
+        } catch (PulsarAdminException.NotFoundException e) {
+            // topic may already be deleted
+        }
+        try {
+            admin.namespaces().deleteNamespace(myNamespace, true);
+        } catch (PulsarAdminException.NotFoundException e) {
+            // namespace may already be deleted
+        }
+        try {
+            admin.namespaces().deleteNamespace(myNamespaceV1, true);
+        } catch (PulsarAdminException.NotFoundException e) {
+            // namespace may already be deleted
+        }
+        admin.namespaces().createNamespace(testTenant + "/" + testNamespace, 
Set.of("test"));
+        admin.namespaces().createNamespace(myNamespaceV1);
+        admin.topics().createPartitionedTopic(testTopic, testTopicPartitions);
+        // Acquire namespace bundle ownership so tests that call 
getOrCreateTopic() directly succeed.
+        // Without this, services that don't create a __change_events reader 
(e.g. MetadataStoreTopicPoliciesService)
+        // leave the bundle unowned after namespace recreation and the first 
broker-side topic load fails.
+        admin.lookups().lookupTopic(testTopic + "-partition-0");
+    }
+
+    @AfterMethod(alwaysRun = true)
+    void afterMethodCleanup() throws Exception{
+        
admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInMessages", 
"0");
+        
admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", 
"0");
+        clearTopicPoliciesCache();
+    }
+
     @Test
     public void updatePropertiesForAutoCreatedTopicTest() throws Exception {
         TopicName topicName = TopicName.get(
@@ -519,8 +553,8 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(dataProvider = "clientRequestType")
     public void testPriorityOfGlobalPolicies(String clientRequestType) throws 
Exception {
-        final SystemTopicBasedTopicPoliciesService topicPoliciesService =
-                (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+        final TopicPoliciesService topicPoliciesService =
+                pulsar.getTopicPoliciesService();
         final JerseyClient httpClient = JerseyClientBuilder.createClient();
         // create topic and load it up.
         final String namespace = myNamespace;
@@ -600,8 +634,8 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(dataProvider = "clientRequestType")
     public void testPriorityOfGlobalPolicies2(String clientRequestType) throws 
Exception {
-        final SystemTopicBasedTopicPoliciesService topicPoliciesService =
-                (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+        final TopicPoliciesService topicPoliciesService =
+                pulsar.getTopicPoliciesService();
         final JerseyClient httpClient = JerseyClientBuilder.createClient();
         // create topic and load it up.
         final String namespace = myNamespace;
@@ -687,8 +721,8 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final TopicName topicName = TopicName.get(topic);
         admin.topics().createNonPartitionedTopic(topic);
         pulsarClient.newProducer().topic(topic).create().close();
-        final SystemTopicBasedTopicPoliciesService topicPoliciesService =
-                (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+        final TopicPoliciesService topicPoliciesService =
+                pulsar.getTopicPoliciesService();
 
         // Set non-global policy of the limitation of max consumers.
         // Set global policy of the limitation of max producers.
@@ -729,8 +763,8 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         final TopicName topicName = TopicName.get(topic);
         admin.topics().createNonPartitionedTopic(topic);
         pulsarClient.newProducer().topic(topic).create().close();
-        final SystemTopicBasedTopicPoliciesService topicPoliciesService =
-                (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+        final TopicPoliciesService topicPoliciesService =
+                pulsar.getTopicPoliciesService();
 
         // Set non-global policy of the limitation of max consumers.
         // Set global policy of the persistence policies.
@@ -2756,10 +2790,8 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testPublishRateInDifferentLevelPolicy() throws Exception {
-        cleanup();
-        conf.setMaxPublishRatePerTopicInMessages(5);
-        conf.setMaxPublishRatePerTopicInBytes(50L);
-        setup();
+        
admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInMessages", 
"5");
+        
admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", 
"50");
 
         final String topicName = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
         pulsarClient.newProducer().topic(topicName).create().close();
@@ -3050,9 +3082,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testMaxUnackedMessagesOnSubscriptionPriority() throws 
Exception {
-        cleanup();
-        conf.setMaxUnackedMessagesPerSubscription(30);
-        setup();
+        restartBroker(conf -> conf.setMaxUnackedMessagesPerSubscription(30));
         final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
         // init cache
         @Cleanup
@@ -3115,6 +3145,9 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 && 
admin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic) == null);
         messages = getMsgReceived(consumer1, Integer.MAX_VALUE);
         assertEquals(messages.size(), defaultMaxUnackedMsgOnBroker);
+
+        // restore default config
+        restartBroker(conf -> conf.setMaxUnackedMessagesPerSubscription(4 * 
50000));
     }
 
     private void produceMsg(Producer<byte[]> producer, int msgNum) throws 
Exception{
@@ -3299,14 +3332,16 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 30000)
     public void testAutoCreationDisabled() throws Exception {
-        cleanup();
-        conf.setAllowAutoTopicCreation(false);
-        setup();
+        admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", 
"false");
+
         final String topic = testTopic + UUID.randomUUID();
         admin.topics().createPartitionedTopic(topic, 3);
         pulsarClient.newProducer().topic(topic).create().close();
         //should not fail
         assertNull(admin.topicPolicies().getMessageTTL(topic));
+
+        // restore default
+        admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", 
"true");
     }
 
     @SuppressWarnings("deprecation")
@@ -3431,6 +3466,12 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         pulsarClient.newConsumer().topic(topic)
                 
.subscriptionType(SubscriptionType.Shared).subscriptionName("test")
                 .subscribe().close();
+
+        // restore dynamic broker config and conf object
+        pulsar.getConfiguration().setSubscriptionTypesEnabled(
+                Set.of("Exclusive", "Shared", "Failover", "Key_Shared"));
+        admin.brokers().updateDynamicConfiguration("subscriptionTypesEnabled",
+                "Exclusive,Shared,Failover,Key_Shared");
     }
 
     @Test(timeOut = 20000)
@@ -3765,7 +3806,8 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
+    public void testDoNotCreateSystemTopicForHeartbeatNamespace() throws 
Exception {
+        initEventsTopicAndPartitions();
         assertTrue(pulsar.getBrokerService().getTopics().size() > 0);
         pulsar.getBrokerService().getTopics().forEach((k, v) -> {
             TopicName topicName = TopicName.get(k);
@@ -3826,8 +3868,13 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     private void triggerAndWaitNewTopicCompaction(String topicName) throws 
Exception {
-        PersistentTopic tp =
-                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        Optional<Topic> topicOpt =
+                pulsar.getBrokerService().getTopic(topicName, false).join();
+        if (topicOpt.isEmpty()) {
+            // Topic doesn't exist (e.g., when not using system-topic-based 
policies service), nothing to compact.
+            return;
+        }
+        PersistentTopic tp = (PersistentTopic) topicOpt.get();
         // Wait for the old task finish.
         Awaitility.await().untilAsserted(() -> {
             CompletableFuture<Long> compactionTask = 
WhiteboxImpl.getInternalState(tp, "currentCompaction");
@@ -3846,7 +3893,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
      * It is not a thread safety method, something will go to a wrong pointer 
if there is a task is trying to load a
      * topic policies.
      */
-    private void clearTopicPoliciesCache() {
+    protected void clearTopicPoliciesCache() {
         TopicPoliciesService topicPoliciesService = 
pulsar.getTopicPoliciesService();
         if (topicPoliciesService instanceof 
TopicPoliciesService.TopicPoliciesServiceDisabled) {
             return;
@@ -4076,8 +4123,8 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                         .isNull());
         admin.topicPolicies(true).setRetention(topic, new RetentionPolicies(1,
                 2));
-        SystemTopicBasedTopicPoliciesService topicPoliciesService =
-                (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+        TopicPoliciesService topicPoliciesService =
+                pulsar.getTopicPoliciesService();
 
         // check global topic policies can be added correctly.
         Awaitility.await().untilAsserted(() -> assertNotNull(
@@ -4121,6 +4168,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testMaxMessageSizeWithChunking() throws Exception {
+        final var maxMessageSize = this.conf.getMaxMessageSize();
         this.conf.setMaxMessageSize(1000);
 
         @Cleanup
@@ -4149,6 +4197,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
         // chunk message send success
         producer.send(new byte[2000]);
+        this.conf.setMaxMessageSize(maxMessageSize);
     }
 
     @Test(timeOut = 30000)
@@ -4202,6 +4251,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testProduceChangesWithEncryptionRequired() throws Exception {
+        initEventsTopicAndPartitions();
         final String beforeLac = 
admin.topics().getInternalStats(topicPolicyEventsTopic).lastConfirmedEntry;
         admin.namespaces().setEncryptionRequiredStatus(myNamespace, true);
         // just an update to trigger writes on __change_events
@@ -4657,4 +4707,10 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         
assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), (Long) 
(1024 * 1024 * 10L),
                 "Should inherit offload threshold from legacy namespace 
policy");
     }
+
+    private void initEventsTopicAndPartitions() throws Exception {
+        try (Producer<?> producer = 
pulsarClient.newProducer().topic(testTopic).create()) {
+            // No-op. Creating the producer initializes the events topic and 
partitions.
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesServiceTest.java
new file mode 100644
index 00000000000..47a7de0528d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesServiceTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ThrowingRunnable;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test order: testUpgrade() -> other tests (with 
MetadataStoreTopicPoliciesService configured) -> testDowngrade().
+ */
+@Test(groups = "broker")
+public class LegacyAwareTopicPoliciesServiceTest extends 
MockedPulsarServiceBaseTest {
+
+    private static final String metaNamespace = "public/meta-ns";
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.setupDefaultTenantAndNamespace();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(priority = -1)
+    public void testUpgrade() throws Exception {
+        final var topic = "test-upgrade";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topicPolicies().setCompactionThreshold(topic, 100);
+        waitUntilAssert(() -> 
assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 100));
+
+        restartBroker(conf -> {
+            conf.setSystemTopicEnabled(false);
+            
conf.setTopicPoliciesServiceClassName(MetadataStoreTopicPoliciesService.class.getName());
+        });
+        // The policies will be lost because when system topic is disabled, it 
will not try to read policies from the
+        // __change_events topic
+        assertNull(admin.topicPolicies().getCompactionThreshold(topic));
+
+        restartBroker(conf -> conf.setSystemTopicEnabled(true));
+        // The default namespace still read policies from the __change_events 
topic if it exists
+        assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 100);
+        
assertFalse(pulsar.getLocalMetadataStore().exists(MetadataStoreTopicPoliciesService.LOCAL_POLICIES_ROOT).get());
+
+        // The global policies are still stored in the __change_events topic
+        admin.topicPolicies(true).setCompactionThreshold(topic, 200);
+        waitUntilAssert(() -> 
assertEquals(admin.topicPolicies(true).getCompactionThreshold(topic), 200));
+        assertFalse(pulsar.getConfigurationMetadataStore()
+                
.exists(MetadataStoreTopicPoliciesService.GLOBAL_POLICIES_ROOT).get());
+
+        admin.topicPolicies().deleteTopicPolicies(topic);
+        waitUntilAssert(() -> 
assertNull(admin.topicPolicies().getCompactionThreshold(topic)));
+
+        admin.namespaces().createNamespace(metaNamespace);
+    }
+
+    @Test(priority = 1)
+    public void testDowngrade() throws Exception {
+        final var topic1 = "downgrade"; // in default namespace
+        admin.topics().createNonPartitionedTopic(topic1);
+        admin.topicPolicies().setCompactionThreshold(topic1, 1);
+        waitUntilAssert(() -> 
assertEquals(admin.topicPolicies().getCompactionThreshold(topic1), 1));
+
+        final var topic2 = metaNamespace + "/downgrade";
+        admin.topics().createNonPartitionedTopic(topic2);
+        admin.topicPolicies().setCompactionThreshold(topic2, 2);
+        waitUntilAssert(() -> 
assertEquals(admin.topicPolicies().getCompactionThreshold(topic2), 2));
+
+        restartBroker(conf ->
+                
conf.setTopicPoliciesServiceClassName(SystemTopicBasedTopicPoliciesService.class.getName()));
+        assertEquals(admin.topicPolicies().getCompactionThreshold(topic1), 1);
+        // The policies will be lost because they are not stored in the 
__change_events topic
+        assertNull(admin.topicPolicies().getCompactionThreshold(topic2));
+    }
+
+    @DataProvider
+    public Object[][] namespaces() {
+        return new Object[][] {
+                { "public/default" },
+                { metaNamespace }
+        };
+    }
+
+    @Test(dataProvider = "namespaces")
+    public void testPoliciesOperations(String namespace) throws Exception {
+        final var topicName = TopicName.get(namespace + 
"/test-policies-operations");
+        final var topic = topicName.toString();
+        admin.topics().createNonPartitionedTopic(topic);
+
+        final var compactionThreshold = new AtomicLong(0);
+        // Verify the exception thrown from one listener does not affect other 
listeners
+        pulsar.getTopicPoliciesService().registerListenerAsync(topicName, __ 
-> {
+            throw new RuntimeException("injected failure");
+        }).get();
+        pulsar.getTopicPoliciesService().registerListenerAsync(topicName, 
policies ->
+                
Optional.ofNullable(policies).map(TopicPolicies::getCompactionThreshold).ifPresentOrElse(
+                        compactionThreshold::set, () -> 
compactionThreshold.set(-1))).get();
+
+        // Verify Created events are handled
+        admin.topicPolicies(false).setCompactionThreshold(topic, 100);
+        waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 100));
+        final var localStore = pulsar.getLocalMetadataStore();
+        final var configurationStore = pulsar.getConfigurationMetadataStore();
+
+        if (namespace.equals(metaNamespace)) {
+            
assertTrue(localStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName,
 false)).get());
+            
assertFalse(configurationStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName,
 true)).get());
+        }
+
+        admin.topicPolicies(true).setCompactionThreshold(topic, 200);
+        waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 200));
+        if (namespace.equals(metaNamespace)) {
+            
assertTrue(configurationStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName,
 true)).get());
+        }
+
+        // Verify Modified events are handled
+        admin.topicPolicies(false).setCompactionThreshold(topic, 300);
+        waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 300));
+
+        admin.topicPolicies(true).setCompactionThreshold(topic, 400);
+        waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 400));
+
+        final var readerNamespaces = ((LegacyAwareTopicPoliciesService) 
pulsar.getTopicPoliciesService())
+                .systemTopicService.getReaderCaches().keySet();
+        
assertFalse(readerNamespaces.contains(NamespaceName.get(metaNamespace)));
+
+        // Verify Deleted events are handled
+        admin.topicPolicies(false).deleteTopicPolicies(topic);
+        waitUntilAssert(() -> assertEquals(compactionThreshold.get(), -1));
+        if (namespace.equals(metaNamespace)) {
+            
assertFalse(localStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName,
 false)).get());
+            
assertFalse(configurationStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName,
 true)).get());
+        }
+    }
+
+    @Test
+    public void testUserCreatedEventsTopicAreIgnored() throws Exception {
+        final var topic = TopicName.get(metaNamespace + "/" + 
System.currentTimeMillis()).toString();
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topicPolicies().setCompactionThreshold(topic, 1);
+        waitUntilAssert(() -> 
assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 1));
+
+        final var eventsTopic = metaNamespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+        admin.topics().createNonPartitionedTopic(eventsTopic);
+        // Even if the __change_events topic is created, since it has detected 
the namespace didn't have the events
+        // topic before, it will be ignored and the policies are still read 
from metadata store.
+        waitUntilAssert(() -> 
assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 1));
+        admin.topics().delete(eventsTopic);
+    }
+
+    private static void waitUntilAssert(ThrowingRunnable assertion) {
+        
Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(assertion);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index d02154c8178..2e8ca8cebec 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -109,7 +109,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         CompletableFuture<Void> f = 
CompletableFuture.completedFuture(null).thenRunAsync(() -> {
             for (int i = 0; i < 100; i++) {
                 TopicPolicyListener listener = new TopicPolicyListenerImpl();
-                
systemTopicBasedTopicPoliciesService.registerListener(topicName, listener);
+                
systemTopicBasedTopicPoliciesService.registerListenerAsync(topicName, listener);
                 
Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
                 
Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size()
 >= 1);
                 
systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener);
@@ -118,7 +118,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
 
         for (int i = 0; i < 100; i++) {
             TopicPolicyListener listener = new TopicPolicyListenerImpl();
-            systemTopicBasedTopicPoliciesService.registerListener(topicName, 
listener);
+            
systemTopicBasedTopicPoliciesService.registerListenerAsync(topicName, listener);
             
Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
             
Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size()
 >= 1);
             systemTopicBasedTopicPoliciesService.unregisterListener(topicName, 
listener);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
index 6b9735d59b2..7e9c697fb5d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
@@ -72,6 +72,13 @@ public class TopicPolicyTestUtils {
     public static Optional<TopicPolicies> 
getTopicPoliciesBypassCache(TopicPoliciesService topicPoliciesService,
                                                                       
TopicName topicName, boolean isGlobal)
             throws Exception {
+        if (topicPoliciesService instanceof LegacyAwareTopicPoliciesService 
legacyService) {
+            TopicPoliciesService resolved = 
legacyService.resolveService(topicName.getNamespaceObject()).get();
+            return getTopicPoliciesBypassCache(resolved, topicName, isGlobal);
+        }
+        if (topicPoliciesService instanceof MetadataStoreTopicPoliciesService 
metadataStoreService) {
+            return 
metadataStoreService.getTopicPoliciesDirectFromStore(topicName, isGlobal).get();
+        }
         @Cleanup final var reader = ((SystemTopicBasedTopicPoliciesService) 
topicPoliciesService)
                 .getNamespaceEventsSystemTopicFactory()
                 
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject())

Reply via email to