This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 990fb2f5077b2885c03c4b39c5c0e9a2d3229dde
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Nov 5 02:46:05 2025 +0800

    [fix][broker] Avoid recursive update in ConcurrentHashMap during policy 
cache cleanup (#24939)
    
    (cherry picked from commit 344905f136af9dbc6b132d12b4a7050a8964055d)
---
 .../SystemTopicBasedTopicPoliciesService.java      |  7 +--
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 51 +++++++++++++++++++++-
 2 files changed, 53 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 6ea557f63c9..ebb6f3a1069 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -430,7 +430,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                                     // Read policies in 
background
                                                     .thenAccept(__ -> 
readMorePoliciesAsync(reader));
                                         });
-                                initFuture.exceptionally(ex -> {
+                                initFuture.exceptionallyAsync(ex -> {
                                     try {
                                         if (closed.get()) {
                                             return null;
@@ -444,7 +444,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                                 namespace, cleanupEx);
                                     }
                                     return null;
-                                });
+                                }, pulsarService.getExecutor());
                                 // let caller know we've got an exception.
                                 return initFuture;
                             });
@@ -704,7 +704,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
     }
 
-    private NamespaceEventsSystemTopicFactory 
getNamespaceEventsSystemTopicFactory() {
+    @VisibleForTesting
+    NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() {
         try {
             return namespaceEventsSystemTopicFactoryLazyInitializer.get();
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 9caee00cb61..70c506b68de 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.spy;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertNull;
 import static org.testng.AssertJUnit.assertTrue;
 import java.lang.reflect.Field;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -45,14 +47,14 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCach
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
@@ -484,4 +486,49 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         service.prepareInitPoliciesCacheAsync(namespaceName).get();
         admin.namespaces().deleteNamespace(NAMESPACE5);
     }
+
+    @Test
+    public void testCreateNamespaceEventsSystemTopicFactoryException() throws 
Exception {
+        final String namespace = "system-topic/namespace-6";
+
+        admin.namespaces().createNamespace(namespace);
+
+        TopicName topicName = TopicName.get("persistent", 
NamespaceName.get(namespace), "topic-1");
+
+        SystemTopicBasedTopicPoliciesService service =
+                Mockito.spy((SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService());
+
+        // inject exception when create NamespaceEventsSystemTopicFactory
+        Mockito.doThrow(new RuntimeException("test exception"))
+                .doCallRealMethod()
+                .when(service)
+                .getNamespaceEventsSystemTopicFactory();
+
+        try {
+            service.getTopicPoliciesAsync(topicName, false).join();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause().getMessage().contains("test 
exception"));
+        }
+
+        TopicPolicies updatedTopicPolicies = new TopicPolicies();
+        updatedTopicPolicies.setMaxConsumerPerTopic(10);
+        Awaitility.await().untilAsserted(() -> {
+            assertThat(service.updateTopicPoliciesAsync(topicName, 
updatedTopicPolicies))
+                    .succeedsWithin(Duration.ofSeconds(2));
+        });
+
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
+                    service.getTopicPoliciesAsync(topicName, false);
+            Optional<TopicPolicies> topicPoliciesOptional = 
topicPoliciesFuture.join();
+
+            Assert.assertNotNull(topicPoliciesOptional);
+            Assert.assertTrue(topicPoliciesOptional.isPresent());
+
+            TopicPolicies topicPolicies = topicPoliciesOptional.get();
+            Assert.assertNotNull(topicPolicies);
+            Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
+        });
+    }
 }

Reply via email to