This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 990fb2f5077b2885c03c4b39c5c0e9a2d3229dde Author: Cong Zhao <[email protected]> AuthorDate: Wed Nov 5 02:46:05 2025 +0800 [fix][broker] Avoid recursive update in ConcurrentHashMap during policy cache cleanup (#24939) (cherry picked from commit 344905f136af9dbc6b132d12b4a7050a8964055d) --- .../SystemTopicBasedTopicPoliciesService.java | 7 +-- .../SystemTopicBasedTopicPoliciesServiceTest.java | 51 +++++++++++++++++++++- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 6ea557f63c9..ebb6f3a1069 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -430,7 +430,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic // Read policies in background .thenAccept(__ -> readMorePoliciesAsync(reader)); }); - initFuture.exceptionally(ex -> { + initFuture.exceptionallyAsync(ex -> { try { if (closed.get()) { return null; @@ -444,7 +444,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic namespace, cleanupEx); } return null; - }); + }, pulsarService.getExecutor()); // let caller know we've got an exception. return initFuture; }); @@ -704,7 +704,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } } - private NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() { + @VisibleForTesting + NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() { try { return namespaceEventsSystemTopicFactoryLazyInitializer.get(); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 9caee00cb61..70c506b68de 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -18,12 +18,14 @@ */ package org.apache.pulsar.broker.service; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.spy; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; import static org.testng.AssertJUnit.assertTrue; import java.lang.reflect.Field; +import java.time.Duration; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -45,14 +47,14 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCach import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.util.Backoff; -import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; import org.mockito.Mockito; @@ -484,4 +486,49 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic service.prepareInitPoliciesCacheAsync(namespaceName).get(); admin.namespaces().deleteNamespace(NAMESPACE5); } + + @Test + public void testCreateNamespaceEventsSystemTopicFactoryException() throws Exception { + final String namespace = "system-topic/namespace-6"; + + admin.namespaces().createNamespace(namespace); + + TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace), "topic-1"); + + SystemTopicBasedTopicPoliciesService service = + Mockito.spy((SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService()); + + // inject exception when create NamespaceEventsSystemTopicFactory + Mockito.doThrow(new RuntimeException("test exception")) + .doCallRealMethod() + .when(service) + .getNamespaceEventsSystemTopicFactory(); + + try { + service.getTopicPoliciesAsync(topicName, false).join(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getCause().getMessage().contains("test exception")); + } + + TopicPolicies updatedTopicPolicies = new TopicPolicies(); + updatedTopicPolicies.setMaxConsumerPerTopic(10); + Awaitility.await().untilAsserted(() -> { + assertThat(service.updateTopicPoliciesAsync(topicName, updatedTopicPolicies)) + .succeedsWithin(Duration.ofSeconds(2)); + }); + + Awaitility.await().untilAsserted(() -> { + CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture = + service.getTopicPoliciesAsync(topicName, false); + Optional<TopicPolicies> topicPoliciesOptional = topicPoliciesFuture.join(); + + Assert.assertNotNull(topicPoliciesOptional); + Assert.assertTrue(topicPoliciesOptional.isPresent()); + + TopicPolicies topicPolicies = topicPoliciesOptional.get(); + Assert.assertNotNull(topicPolicies); + Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10); + }); + } }
