This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d342f35bd62850a6f030ff6120ce8bd7a1a9bf1b Author: JiangHaiting <[email protected]> AuthorDate: Fri Nov 19 18:38:45 2021 +0800 Remove unused listeners if it have no listeners. (#12654) (cherry picked from commit d74af88a6aed5a7da3139a4228ae29f793ec72b2) --- .../SystemTopicBasedTopicPoliciesService.java | 21 +++++++++++-- .../SystemTopicBasedTopicPoliciesServiceTest.java | 36 ++++++++++++++++++++-- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 10f47e6..1cc71e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -69,7 +69,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic @VisibleForTesting final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>(); - private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>(); + @VisibleForTesting + final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>(); public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { this.pulsarService = pulsarService; @@ -483,12 +484,26 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic @Override public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) { - listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).add(listener); + listeners.compute(topicName, (k, topicListeners) -> { + if (topicListeners == null) { + topicListeners = Lists.newCopyOnWriteArrayList(); + } + topicListeners.add(listener); + return topicListeners; + }); } @Override public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) { - listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).remove(listener); + listeners.compute(topicName, (k, topicListeners) -> { + if (topicListeners != null){ + topicListeners.remove(listener); + if (topicListeners.isEmpty()) { + topicListeners = null; + } + } + return topicListeners; + }); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 80f3dc9..be52f9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -76,7 +76,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic private static final TopicName TOPIC5 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-1"); private static final TopicName TOPIC6 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-2"); - private NamespaceEventsSystemTopicFactory systemTopicFactory; private SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService; @BeforeMethod(alwaysRun = true) @@ -95,6 +94,40 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic } @Test + public void testConcurrentlyRegisterUnregisterListeners() throws ExecutionException, InterruptedException { + TopicName topicName = TopicName.get("test"); + class TopicPolicyListenerImpl implements TopicPolicyListener<TopicPolicies> { + + @Override + public void onUpdate(TopicPolicies data) { + //no op. + } + } + + CompletableFuture<Void> f = CompletableFuture.completedFuture(null).thenRunAsync(() -> { + for (int i = 0; i < 100; i++) { + TopicPolicyListener<TopicPolicies> listener = new TopicPolicyListenerImpl(); + systemTopicBasedTopicPoliciesService.registerListener(topicName, listener); + Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName)); + Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1); + systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener); + } + }); + + for (int i = 0; i < 100; i++) { + TopicPolicyListener<TopicPolicies> listener = new TopicPolicyListenerImpl(); + systemTopicBasedTopicPoliciesService.registerListener(topicName, listener); + Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName)); + Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1); + systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener); + } + + f.get(); + //Some system topics will be added to the listeners. Just check if it contains topicName. + Assert.assertFalse(systemTopicBasedTopicPoliciesService.listeners.containsKey(topicName)); + } + + @Test public void testGetPolicy() throws ExecutionException, InterruptedException, TopicPoliciesCacheNotInitException { // Init topic policies @@ -239,7 +272,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic admin.lookups().lookupTopic(TOPIC4.toString()); admin.lookups().lookupTopic(TOPIC5.toString()); admin.lookups().lookupTopic(TOPIC6.toString()); - systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); }
