This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c2ea6a9afeee9f436d9c44564b5f9e8d343e334c
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Nov 5 02:46:05 2025 +0800

    [fix][broker] Avoid recursive update in ConcurrentHashMap during policy 
cache cleanup (#24939)
    
    (cherry picked from commit 344905f136af9dbc6b132d12b4a7050a8964055d)
---
 .../SystemTopicBasedTopicPoliciesService.java      |  4 +-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 54 ++++++++++++++++++++--
 2 files changed, 53 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 880498209a6..c745d591a4e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -589,7 +589,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                                     // Read policies in 
background
                                                     .thenAccept(__ -> 
readMorePoliciesAsync(reader));
                                         });
-                                initFuture.exceptionally(ex -> {
+                                initFuture.exceptionallyAsync(ex -> {
                                     try {
                                         if (closed.get()) {
                                             return null;
@@ -603,7 +603,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                                 namespace, cleanupEx);
                                     }
                                     return null;
-                                });
+                                }, pulsarService.getExecutor());
                                 // let caller know we've got an exception.
                                 return initFuture;
                             }).thenApply(__ -> true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 8149b7a9435..4326f83d763 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -18,13 +18,16 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.spy;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertNull;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -45,7 +48,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
-import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -319,12 +321,12 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
                 spy(new ConcurrentHashMap<TopicName, TopicPolicies>());
         FieldUtils.writeDeclaredField(topicPoliciesService, "policiesCache", 
spyPoliciesCache, true);
 
-        Awaitility.await().untilAsserted(() -> Assertions.assertThat(
+        Awaitility.await().untilAsserted(() -> assertThat(
                 TopicPolicyTestUtils.getTopicPolicies(topicPoliciesService, 
TopicName.get(topic))).isNull());
 
         admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
         Awaitility.await().untilAsserted(() -> {
-                
Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(),
+                
assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(),
                         TopicName.get(topic))).isNotNull();
             });
 
@@ -421,4 +423,50 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         service.prepareInitPoliciesCacheAsync(namespaceName).get();
         admin.namespaces().deleteNamespace(NAMESPACE5);
     }
+
+    @Test
+    public void testCreateNamespaceEventsSystemTopicFactoryException() throws 
Exception {
+        final String namespace = "system-topic/namespace-6";
+
+        admin.namespaces().createNamespace(namespace);
+
+        TopicName topicName = TopicName.get("persistent", 
NamespaceName.get(namespace), "topic-1");
+
+        SystemTopicBasedTopicPoliciesService service =
+            Mockito.spy((SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService());
+
+        // inject exception when create NamespaceEventsSystemTopicFactory
+        Mockito.doThrow(new RuntimeException("test exception"))
+                .doCallRealMethod()
+                .when(service)
+            .getNamespaceEventsSystemTopicFactory();
+
+        CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
+        Optional<TopicPolicies> topicPoliciesOptional;
+        try {
+            topicPoliciesFuture =
+            service.getTopicPoliciesAsync(topicName, 
TopicPoliciesService.GetType.LOCAL_ONLY);
+            topicPoliciesOptional = topicPoliciesFuture.join();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause().getMessage().contains("test 
exception"));
+        }
+
+        Awaitility.await().untilAsserted(() -> {
+            assertThat(service.updateTopicPoliciesAsync(topicName, false, 
false, topicPolicies ->
+                    topicPolicies.setMaxConsumerPerTopic(10)))
+                    .succeedsWithin(Duration.ofSeconds(2));
+        });
+
+        topicPoliciesFuture =
+            service.getTopicPoliciesAsync(topicName, 
TopicPoliciesService.GetType.LOCAL_ONLY);
+        topicPoliciesOptional = topicPoliciesFuture.join();
+
+        Assert.assertNotNull(topicPoliciesOptional);
+        Assert.assertTrue(topicPoliciesOptional.isPresent());
+
+        TopicPolicies topicPolicies = topicPoliciesOptional.get();
+        Assert.assertNotNull(topicPolicies);
+        Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
+    }
 }

Reply via email to