This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new bda4fedd6a3 [fix][broker] Avoid recursive update in ConcurrentHashMap 
during policy cache cleanup (#24939)
bda4fedd6a3 is described below

commit bda4fedd6a3bc0469f3e978ddc7fd96bc7ac094a
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Nov 5 02:46:05 2025 +0800

    [fix][broker] Avoid recursive update in ConcurrentHashMap during policy 
cache cleanup (#24939)
    
    (cherry picked from commit 344905f136af9dbc6b132d12b4a7050a8964055d)
---
 .../SystemTopicBasedTopicPoliciesService.java      |  7 ++--
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 49 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index e2f58b684db..9b8fe6c7bdd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -427,7 +427,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                                     // Read policies in 
background
                                                     .thenAccept(__ -> 
readMorePoliciesAsync(reader));
                                         });
-                                initFuture.exceptionally(ex -> {
+                                initFuture.exceptionallyAsync(ex -> {
                                     try {
                                         log.error("[{}] Failed to create 
reader on __change_events topic",
                                                 namespace, ex);
@@ -438,7 +438,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                                 namespace, cleanupEx);
                                     }
                                     return null;
-                                });
+                                }, pulsarService.getExecutor());
                                 // let caller know we've got an exception.
                                 return initFuture;
                             });
@@ -698,7 +698,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
     }
 
-    private NamespaceEventsSystemTopicFactory 
getNamespaceEventsSystemTopicFactory() {
+    @VisibleForTesting
+    NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() {
         try {
             return namespaceEventsSystemTopicFactoryLazyInitializer.get();
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index f79c7412bc7..ab0f23d455d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.spy;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertNull;
 import static org.testng.AssertJUnit.assertTrue;
 import java.lang.reflect.Field;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -483,4 +485,51 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         service.prepareInitPoliciesCacheAsync(namespaceName).get();
         admin.namespaces().deleteNamespace(NAMESPACE5);
     }
+
+    @Test
+    public void testCreateNamespaceEventsSystemTopicFactoryException() throws 
Exception {
+        final String namespace = "system-topic/namespace-6";
+
+        admin.namespaces().createNamespace(namespace);
+
+        TopicName topicName = TopicName.get("persistent", 
NamespaceName.get(namespace), "topic-1");
+
+        SystemTopicBasedTopicPoliciesService service =
+                Mockito.spy((SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService());
+
+        // inject exception when create NamespaceEventsSystemTopicFactory
+        Mockito.doThrow(new RuntimeException("test exception"))
+                .doCallRealMethod()
+                .when(service)
+                .getNamespaceEventsSystemTopicFactory();
+
+        CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
+        Optional<TopicPolicies> topicPoliciesOptional;
+        try {
+            topicPoliciesFuture =
+                    service.getTopicPoliciesAsync(topicName, false);
+            topicPoliciesOptional = topicPoliciesFuture.join();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause().getMessage().contains("test 
exception"));
+        }
+
+        TopicPolicies updatedTopicPolicies = new TopicPolicies();
+        updatedTopicPolicies.setMaxConsumerPerTopic(10);
+        Awaitility.await().untilAsserted(() -> {
+            assertThat(service.updateTopicPoliciesAsync(topicName, 
updatedTopicPolicies))
+                    .succeedsWithin(Duration.ofSeconds(2));
+        });
+
+        topicPoliciesFuture =
+                service.getTopicPoliciesAsync(topicName, false);
+        topicPoliciesOptional = topicPoliciesFuture.join();
+
+        Assert.assertNotNull(topicPoliciesOptional);
+        Assert.assertTrue(topicPoliciesOptional.isPresent());
+
+        TopicPolicies topicPolicies = topicPoliciesOptional.get();
+        Assert.assertNotNull(topicPolicies);
+        Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
+    }
 }

Reply via email to