This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new bda4fedd6a3 [fix][broker] Avoid recursive update in ConcurrentHashMap
during policy cache cleanup (#24939)
bda4fedd6a3 is described below
commit bda4fedd6a3bc0469f3e978ddc7fd96bc7ac094a
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Nov 5 02:46:05 2025 +0800
[fix][broker] Avoid recursive update in ConcurrentHashMap during policy
cache cleanup (#24939)
(cherry picked from commit 344905f136af9dbc6b132d12b4a7050a8964055d)
---
.../SystemTopicBasedTopicPoliciesService.java | 7 ++--
.../SystemTopicBasedTopicPoliciesServiceTest.java | 49 ++++++++++++++++++++++
2 files changed, 53 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index e2f58b684db..9b8fe6c7bdd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -427,7 +427,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
// Read policies in
background
.thenAccept(__ ->
readMorePoliciesAsync(reader));
});
- initFuture.exceptionally(ex -> {
+ initFuture.exceptionallyAsync(ex -> {
try {
log.error("[{}] Failed to create
reader on __change_events topic",
namespace, ex);
@@ -438,7 +438,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
namespace, cleanupEx);
}
return null;
- });
+ }, pulsarService.getExecutor());
// let caller know we've got an exception.
return initFuture;
});
@@ -698,7 +698,8 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
}
- private NamespaceEventsSystemTopicFactory
getNamespaceEventsSystemTopicFactory() {
+ @VisibleForTesting
+ NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() {
try {
return namespaceEventsSystemTopicFactoryLazyInitializer.get();
} catch (Exception e) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index f79c7412bc7..ab0f23d455d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -18,12 +18,14 @@
*/
package org.apache.pulsar.broker.service;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.spy;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
import static org.testng.AssertJUnit.assertTrue;
import java.lang.reflect.Field;
+import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -483,4 +485,51 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
service.prepareInitPoliciesCacheAsync(namespaceName).get();
admin.namespaces().deleteNamespace(NAMESPACE5);
}
+
+ @Test
+ public void testCreateNamespaceEventsSystemTopicFactoryException() throws
Exception {
+ final String namespace = "system-topic/namespace-6";
+
+ admin.namespaces().createNamespace(namespace);
+
+ TopicName topicName = TopicName.get("persistent",
NamespaceName.get(namespace), "topic-1");
+
+ SystemTopicBasedTopicPoliciesService service =
+ Mockito.spy((SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService());
+
+ // inject exception when create NamespaceEventsSystemTopicFactory
+ Mockito.doThrow(new RuntimeException("test exception"))
+ .doCallRealMethod()
+ .when(service)
+ .getNamespaceEventsSystemTopicFactory();
+
+ CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
+ Optional<TopicPolicies> topicPoliciesOptional;
+ try {
+ topicPoliciesFuture =
+ service.getTopicPoliciesAsync(topicName, false);
+ topicPoliciesOptional = topicPoliciesFuture.join();
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getCause().getMessage().contains("test
exception"));
+ }
+
+ TopicPolicies updatedTopicPolicies = new TopicPolicies();
+ updatedTopicPolicies.setMaxConsumerPerTopic(10);
+ Awaitility.await().untilAsserted(() -> {
+ assertThat(service.updateTopicPoliciesAsync(topicName,
updatedTopicPolicies))
+ .succeedsWithin(Duration.ofSeconds(2));
+ });
+
+ topicPoliciesFuture =
+ service.getTopicPoliciesAsync(topicName, false);
+ topicPoliciesOptional = topicPoliciesFuture.join();
+
+ Assert.assertNotNull(topicPoliciesOptional);
+ Assert.assertTrue(topicPoliciesOptional.isPresent());
+
+ TopicPolicies topicPolicies = topicPoliciesOptional.get();
+ Assert.assertNotNull(topicPolicies);
+ Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
+ }
}