This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 66624713da7 [fix][broker]Non-global topic policies and global topic 
policies overwrite each other (#24286)
66624713da7 is described below

commit 66624713da79061dee455f0a1fd82b5fa8e9ff4b
Author: fengyubiao <[email protected]>
AuthorDate: Wed May 21 23:58:21 2025 +0800

    [fix][broker]Non-global topic policies and global topic policies overwrite 
each other (#24286)
---
 .../SystemTopicBasedTopicPoliciesService.java      |  61 ++++---
 .../broker/service/TopicPoliciesService.java       |  37 ++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 186 +++++++++++++++++++++
 .../NamespaceEventsSystemTopicServiceTest.java     |   4 +-
 4 files changed, 261 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 61960311e03..4745decf58d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.pulsar.broker.service.TopicPoliciesService.getEventKey;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
@@ -61,6 +62,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.jspecify.annotations.NonNull;
+import org.jspecify.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -175,7 +177,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, 
ActionType actionType,
-                                                         TopicPolicies 
policies) {
+                                                         @Nullable 
TopicPolicies policies) {
         return pulsarService.getPulsarResources().getNamespaceResources()
                 .getPoliciesAsync(topicName.getNamespaceObject())
                 .thenCompose(namespacePolicies -> {
@@ -196,10 +198,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                     
writerCaches.synchronous().invalidate(topicName.getNamespaceObject());
                                     result.completeExceptionally(cause);
                                 } else {
-                                    PulsarEvent event = 
getPulsarEvent(topicName, actionType, policies);
-                                    CompletableFuture<MessageId> writeFuture = 
ActionType.DELETE.equals(actionType)
-                                                    ? 
writer.deleteAsync(getEventKey(event), event)
-                                                    : 
writer.writeAsync(getEventKey(event), event);
+                                    CompletableFuture<MessageId> writeFuture =
+                                            
sendTopicPolicyEventInternal(topicName, actionType, writer, policies);
                                     writeFuture.whenComplete((messageId, e) -> 
{
                                         if (e != null) {
                                             result.completeExceptionally(e);
@@ -218,6 +218,25 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 });
     }
 
+    private CompletableFuture<MessageId> 
sendTopicPolicyEventInternal(TopicName topicName, ActionType actionType,
+                                      SystemTopicClient.Writer<PulsarEvent> 
writer,
+                                      @Nullable TopicPolicies policies) {
+        PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
+        if (!ActionType.DELETE.equals(actionType)) {
+            return writer.writeAsync(getEventKey(event, policies != null && 
policies.isGlobalPolicies()), event);
+        }
+        // When a topic is deleting, delete both non-global and global 
topic-level policies.
+        CompletableFuture<MessageId> deletePolicies = 
writer.deleteAsync(getEventKey(event, true), event)
+            .thenCompose(__ -> {
+                return writer.deleteAsync(getEventKey(event, false), event);
+            });
+        deletePolicies.exceptionally(ex -> {
+            log.error("Failed to delete topic policy [{}] error.", topicName, 
ex);
+            return null;
+        });
+        return deletePolicies;
+    }
+
     private PulsarEvent getPulsarEvent(TopicName topicName, ActionType 
actionType, TopicPolicies policies) {
         PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder();
         if (policies == null || !policies.isGlobalPolicies()) {
@@ -241,7 +260,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     private void notifyListener(Message<PulsarEvent> msg) {
         // delete policies
         if (msg.getValue() == null) {
-            TopicName topicName =  
TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
+            TopicName topicName = 
TopicName.get(TopicPoliciesService.unwrapEventKey(msg.getKey())
+                    .getPartitionedTopicName());
             if (listeners.get(topicName) != null) {
                 for (TopicPolicyListener listener : listeners.get(topicName)) {
                     try {
@@ -552,8 +572,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
         // delete policies
         if (msg.getValue() == null) {
-            TopicName topicName = 
TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
-            if (hasReplicateTo(msg)) {
+            boolean isGlobalPolicy = TopicPoliciesService.isGlobalPolicy(msg);
+            TopicName topicName = 
TopicName.get(TopicPoliciesService.unwrapEventKey(msg.getKey())
+                    .getPartitionedTopicName());
+            if (isGlobalPolicy) {
                 globalPoliciesCache.remove(topicName);
             } else {
                 policiesCache.remove(topicName);
@@ -593,14 +615,15 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     }
                     SystemTopicClient<PulsarEvent> systemTopicClient = 
getNamespaceEventsSystemTopicFactory()
                             
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
-                    systemTopicClient.newWriterAsync().thenAccept(writer
-                            -> writer.deleteAsync(getEventKey(topicName),
-                                    getPulsarEvent(topicName, 
ActionType.DELETE, null))
-                            .whenComplete((result, e) -> 
writer.closeAsync().whenComplete((res, ex) -> {
+                    systemTopicClient.newWriterAsync().thenAccept(writer -> {
+                        sendTopicPolicyEventInternal(topicName, 
ActionType.DELETE, writer, event.getPolicies())
+                            .whenComplete((result, e) -> writer.closeAsync()
+                            .whenComplete((res, ex) -> {
                                 if (ex != null) {
                                     log.error("close writer failed ", ex);
                                 }
-                            })));
+                            }));
+                    });
                     break;
                 case NONE:
                     break;
@@ -642,19 +665,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
     }
 
-    public static String getEventKey(PulsarEvent event) {
-        return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
-                event.getTopicPoliciesEvent().getTenant(),
-                event.getTopicPoliciesEvent().getNamespace(),
-                event.getTopicPoliciesEvent().getTopic()).toString();
-    }
 
-    public static String getEventKey(TopicName topicName) {
-        return TopicName.get(topicName.getDomain().toString(),
-                topicName.getTenant(),
-                topicName.getNamespace(),
-                
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString();
-    }
 
     @VisibleForTesting
     long getPoliciesCacheSize() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 4b7ed3765bb..ec5da7995bf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.broker.service;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -34,6 +36,8 @@ import org.apache.pulsar.common.util.FutureUtil;
 @InterfaceAudience.LimitedPrivate
 public interface TopicPoliciesService extends AutoCloseable {
 
+    String GLOBAL_POLICIES_MSG_KEY_PREFIX = "__G__";
+
     TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
 
     /**
@@ -123,4 +127,37 @@ public interface TopicPoliciesService extends 
AutoCloseable {
             //No-op
         }
     }
+
+    static String getEventKey(PulsarEvent event, boolean isGlobal) {
+        return 
wrapEventKey(TopicName.get(event.getTopicPoliciesEvent().getDomain(),
+            event.getTopicPoliciesEvent().getTenant(),
+            event.getTopicPoliciesEvent().getNamespace(),
+            event.getTopicPoliciesEvent().getTopic()).toString(), isGlobal);
+    }
+
+    static String getEventKey(TopicName topicName, boolean isGlobal) {
+        return wrapEventKey(TopicName.get(topicName.getDomain().toString(),
+            topicName.getTenant(),
+            topicName.getNamespace(),
+            
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString(), 
isGlobal);
+    }
+
+    static String wrapEventKey(String originalKey, boolean isGlobalPolicies) {
+        if (!isGlobalPolicies) {
+            return originalKey;
+        }
+        return GLOBAL_POLICIES_MSG_KEY_PREFIX + originalKey;
+    }
+
+    static boolean isGlobalPolicy(Message<PulsarEvent> msg) {
+        return msg.getKey().startsWith(GLOBAL_POLICIES_MSG_KEY_PREFIX);
+    }
+
+    static TopicName unwrapEventKey(String originalKey) {
+        String tpName = originalKey;
+        if (originalKey.startsWith(GLOBAL_POLICIES_MSG_KEY_PREFIX)) {
+            tpName = 
originalKey.substring(GLOBAL_POLICIES_MSG_KEY_PREFIX.length());
+        }
+        return TopicName.get(tpName);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index c7a064e831d..bf2c3e5b2aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin;
 
 import static 
org.apache.pulsar.broker.service.TopicPoliciesService.GetType.GLOBAL_ONLY;
 import static 
org.apache.pulsar.broker.service.TopicPoliciesService.GetType.LOCAL_ONLY;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
@@ -55,6 +56,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
 import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
+import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.service.TopicPolicyTestUtils;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
@@ -99,6 +101,7 @@ import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
 import org.glassfish.jersey.client.JerseyClient;
 import org.glassfish.jersey.client.JerseyClientBuilder;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -3339,6 +3342,189 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    private void triggerAndWaitNewTopicCompaction(String topicName) throws 
Exception {
+        PersistentTopic tp =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        // Wait for the old task finish.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<Long> compactionTask = 
WhiteboxImpl.getInternalState(tp, "currentCompaction");
+            assertTrue(compactionTask == null || compactionTask.isDone());
+        });
+        // Trigger a new task.
+        tp.triggerCompaction();
+        // Wait for the new task finish.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<Long> compactionTask = 
WhiteboxImpl.getInternalState(tp, "currentCompaction");
+            assertTrue(compactionTask == null || compactionTask.isDone());
+        });
+    }
+
+    /***
+     * It is not a thread safety method, something will go to a wrong pointer 
if there is a task is trying to load a
+     * topic policies.
+     */
+    private void clearTopicPoliciesCache() {
+        TopicPoliciesService topicPoliciesService = 
pulsar.getTopicPoliciesService();
+        if (topicPoliciesService instanceof 
TopicPoliciesService.TopicPoliciesServiceDisabled) {
+            return;
+        }
+        assertTrue(topicPoliciesService instanceof 
SystemTopicBasedTopicPoliciesService);
+
+        Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap =
+                WhiteboxImpl.getInternalState(topicPoliciesService, 
"policyCacheInitMap");
+        for (CompletableFuture<Void> future : policyCacheInitMap.values()) {
+            future.join();
+        }
+        Map<TopicName, TopicPolicies> policiesCache =
+                WhiteboxImpl.getInternalState(topicPoliciesService, 
"policiesCache");
+        Map<TopicName, TopicPolicies> globalPoliciesCache =
+                WhiteboxImpl.getInternalState(topicPoliciesService, 
"globalPoliciesCache");
+
+        policyCacheInitMap.clear();
+        policiesCache.clear();
+        globalPoliciesCache.clear();
+    }
+
+    @DataProvider(name = "reloadPolicyTypes")
+    public Object[][] reloadPolicyTypes() {
+        return new Object[][]{
+            {"Clean_Cache"},
+            {"Recreate_Service"}
+        };
+    }
+
+    @Test(dataProvider = "reloadPolicyTypes")
+    public void testTopicPoliciesAfterCompaction(String reloadPolicyType) 
throws Exception {
+        final String tpName = BrokerTestUtil.newUniqueName("persistent://" + 
myNamespace + "/tp");
+        final String tpNameChangeEvents = "persistent://" + myNamespace + "/" 
+ NAMESPACE_EVENTS_LOCAL_NAME;
+        final String subscriptionName = "s1";
+        final int rateMsgLocal = 2000;
+        final int rateMsgGlobal = 1000;
+        admin.topics().createNonPartitionedTopic(tpName);
+        admin.topics().createSubscription(tpName, subscriptionName, 
MessageId.earliest);
+
+        // Set global policy and local policy.
+        // Trigger __change_events compaction.
+        // Reload polices into memory.
+        // Verify: policies was affected.
+        DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 1, 
false, 1);
+        DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 
1, false, 1);
+        admin.topicPolicies(false).setDispatchRate(tpName, dispatchRateLocal);
+        admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
+        triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
+        Optional<TopicPolicies> topicPoliciesOptional1 = null;
+        Optional<TopicPolicies> topicPoliciesOptionalGlobal1 = null;
+        if ("Clean_Cache".equals(reloadPolicyType)) {
+            clearTopicPoliciesCache();
+            topicPoliciesOptional1 = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+                            LOCAL_ONLY).join();
+            topicPoliciesOptionalGlobal1 = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+                    GLOBAL_ONLY).join();
+        } else {
+            SystemTopicBasedTopicPoliciesService newService = new 
SystemTopicBasedTopicPoliciesService(pulsar);
+            topicPoliciesOptional1 = 
newService.getTopicPoliciesAsync(TopicName.get(tpName), LOCAL_ONLY).join();
+            topicPoliciesOptionalGlobal1 = 
newService.getTopicPoliciesAsync(TopicName.get(tpName), GLOBAL_ONLY).join();
+            newService.close();
+        }
+        assertTrue(topicPoliciesOptional1.isPresent());
+        
assertEquals(topicPoliciesOptional1.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
 rateMsgLocal);
+        
assertEquals(topicPoliciesOptionalGlobal1.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
+                rateMsgGlobal);
+
+        // Remove local policy.
+        // Trigger __change_events compaction.
+        // Reload polices into memory.
+        // Verify: policies was affected.
+        admin.topicPolicies(false).removeDispatchRate(tpName);
+        triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
+        Optional<TopicPolicies> topicPoliciesOptional2 = null;
+        Optional<TopicPolicies> topicPoliciesOptionalGlobal2 = null;
+        if ("Clean_Cache".equals(reloadPolicyType)) {
+            clearTopicPoliciesCache();
+            topicPoliciesOptional2 = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+                    LOCAL_ONLY).join();
+            topicPoliciesOptionalGlobal2 = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+                    GLOBAL_ONLY).join();
+        } else {
+            SystemTopicBasedTopicPoliciesService newService = new 
SystemTopicBasedTopicPoliciesService(pulsar);
+            topicPoliciesOptional2 = 
newService.getTopicPoliciesAsync(TopicName.get(tpName), LOCAL_ONLY).join();
+            topicPoliciesOptionalGlobal2 = 
newService.getTopicPoliciesAsync(TopicName.get(tpName), GLOBAL_ONLY).join();
+            newService.close();
+        }
+        assertTrue(topicPoliciesOptional2.isEmpty() || 
topicPoliciesOptional2.get().getDispatchRate() == null);
+        assertTrue(topicPoliciesOptionalGlobal2.isPresent());
+        
assertEquals(topicPoliciesOptionalGlobal2.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
+                rateMsgGlobal);
+
+        // Delete topic.
+        // Trigger __change_events compaction.
+        // Reload polices into memory.
+        // Verify: policies was deleted.
+        admin.topics().delete(tpName, false);
+        Awaitility.await().untilAsserted(() -> {
+            // Reload polices into memory.
+            // Verify: policies was affected.
+            Optional<TopicPolicies> topicPoliciesOptional3 = null;
+            Optional<TopicPolicies> topicPoliciesOptionalGlobal3 = null;
+            if ("Clean_Cache".equals(reloadPolicyType)) {
+                clearTopicPoliciesCache();
+                topicPoliciesOptional3 = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+                        LOCAL_ONLY).join();
+                topicPoliciesOptionalGlobal3 = pulsar.getTopicPoliciesService()
+                        .getTopicPoliciesAsync(TopicName.get(tpName), 
GLOBAL_ONLY).join();
+            } else {
+                SystemTopicBasedTopicPoliciesService newService = new 
SystemTopicBasedTopicPoliciesService(pulsar);
+                topicPoliciesOptional3 = 
newService.getTopicPoliciesAsync(TopicName.get(tpName), LOCAL_ONLY).join();
+                topicPoliciesOptionalGlobal3 = 
newService.getTopicPoliciesAsync(TopicName.get(tpName), GLOBAL_ONLY)
+                        .join();
+                newService.close();
+            }
+            assertTrue(topicPoliciesOptional3.isEmpty()
+                    || topicPoliciesOptional3.get().getDispatchRate() == null);
+            assertTrue(topicPoliciesOptionalGlobal3.isEmpty()
+                    || topicPoliciesOptionalGlobal3.get().getDispatchRate() == 
null);
+        });
+    }
+
+    @Test
+    public void testDeleteGlobalPolicy() throws Exception {
+        final String tpName = BrokerTestUtil.newUniqueName("persistent://" + 
myNamespace + "/tp");
+        final String tpNameChangeEvents = "persistent://" + myNamespace + "/" 
+ NAMESPACE_EVENTS_LOCAL_NAME;
+        final String subscriptionName = "s1";
+        final int rateMsgGlobal = 1000;
+        admin.topics().createNonPartitionedTopic(tpName);
+        admin.topics().createSubscription(tpName, subscriptionName, 
MessageId.earliest);
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(tpName).get().get();
+
+        // Set global policy.
+        // Verify: policies was affected.
+        DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 
1, false, 1);
+        admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(persistentTopic.getHierarchyTopicPolicies().getDispatchRate().get(),
 dispatchRateGlobal);
+        });
+
+        // Delete global policy.
+        // Verify: policies were deleted.
+        triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
+        admin.topicPolicies(true).removeDispatchRate(tpName);
+
+        Awaitility.await().untilAsserted(() -> {
+            Optional<TopicPolicies> topicPoliciesOptional = 
pulsar.getTopicPoliciesService()
+                    .getTopicPoliciesAsync(TopicName.get(tpName), 
LOCAL_ONLY).join();
+            Optional<TopicPolicies> topicPoliciesOptionalGlobal = 
pulsar.getTopicPoliciesService()
+                    .getTopicPoliciesAsync(TopicName.get(tpName), 
GLOBAL_ONLY).join();
+            assertTrue(topicPoliciesOptional.isEmpty()
+                    || topicPoliciesOptional.get().getDispatchRate() == null);
+            assertTrue(topicPoliciesOptionalGlobal.isEmpty()
+                    || topicPoliciesOptionalGlobal.get().getDispatchRate() == 
null);
+        });
+
+        // cleanup.
+        admin.topics().delete(tpName, false);
+    }
+
     @Test
     public void testGlobalTopicPolicies() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index aaa719515c9..885309f8960 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.systopic;
 
-import static 
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getEventKey;
+import static 
org.apache.pulsar.broker.service.TopicPoliciesService.getEventKey;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -127,7 +127,7 @@ public class NamespaceEventsSystemTopicServiceTest extends 
MockedPulsarServiceBa
                 .policies(policies)
                 .build())
             .build();
-        systemTopicClientForNamespace1.newWriter().write(getEventKey(event), 
event);
+        systemTopicClientForNamespace1.newWriter().write(getEventKey(event, 
false), event);
         SystemTopicClient.Reader reader = 
systemTopicClientForNamespace1.newReader();
         @Cleanup("release")
         Message<PulsarEvent> received = reader.readNext();

Reply via email to