This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d342f35bd62850a6f030ff6120ce8bd7a1a9bf1b
Author: JiangHaiting <[email protected]>
AuthorDate: Fri Nov 19 18:38:45 2021 +0800

    Remove unused listeners if it have no listeners. (#12654)
    
    (cherry picked from commit d74af88a6aed5a7da3139a4228ae29f793ec72b2)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 21 +++++++++++--
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 36 ++++++++++++++++++++--
 2 files changed, 52 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 10f47e6..1cc71e9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -69,7 +69,8 @@ public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesServic
     @VisibleForTesting
     final Map<NamespaceName, Boolean> policyCacheInitMap = new 
ConcurrentHashMap<>();
 
-    private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> 
listeners = new ConcurrentHashMap<>();
+    @VisibleForTesting
+    final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = 
new ConcurrentHashMap<>();
 
     public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
         this.pulsarService = pulsarService;
@@ -483,12 +484,26 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     @Override
     public void registerListener(TopicName topicName, 
TopicPolicyListener<TopicPolicies> listener) {
-        listeners.computeIfAbsent(topicName, k -> 
Lists.newCopyOnWriteArrayList()).add(listener);
+        listeners.compute(topicName, (k, topicListeners) -> {
+            if (topicListeners == null) {
+                topicListeners = Lists.newCopyOnWriteArrayList();
+            }
+            topicListeners.add(listener);
+            return topicListeners;
+        });
     }
 
     @Override
     public void unregisterListener(TopicName topicName, 
TopicPolicyListener<TopicPolicies> listener) {
-        listeners.computeIfAbsent(topicName, k -> 
Lists.newCopyOnWriteArrayList()).remove(listener);
+        listeners.compute(topicName, (k, topicListeners) -> {
+            if (topicListeners != null){
+                topicListeners.remove(listener);
+                if (topicListeners.isEmpty()) {
+                    topicListeners = null;
+                }
+            }
+            return topicListeners;
+        });
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 80f3dc9..be52f9a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -76,7 +76,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends 
MockedPulsarServic
     private static final TopicName TOPIC5 = TopicName.get("persistent", 
NamespaceName.get(NAMESPACE3), "topic-1");
     private static final TopicName TOPIC6 = TopicName.get("persistent", 
NamespaceName.get(NAMESPACE3), "topic-2");
 
-    private NamespaceEventsSystemTopicFactory systemTopicFactory;
     private SystemTopicBasedTopicPoliciesService 
systemTopicBasedTopicPoliciesService;
 
     @BeforeMethod(alwaysRun = true)
@@ -95,6 +94,40 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
     }
 
     @Test
+    public void testConcurrentlyRegisterUnregisterListeners() throws 
ExecutionException, InterruptedException {
+        TopicName topicName = TopicName.get("test");
+        class TopicPolicyListenerImpl implements 
TopicPolicyListener<TopicPolicies> {
+
+            @Override
+            public void onUpdate(TopicPolicies data) {
+                //no op.
+            }
+        }
+
+        CompletableFuture<Void> f = 
CompletableFuture.completedFuture(null).thenRunAsync(() -> {
+            for (int i = 0; i < 100; i++) {
+                TopicPolicyListener<TopicPolicies> listener = new 
TopicPolicyListenerImpl();
+                
systemTopicBasedTopicPoliciesService.registerListener(topicName, listener);
+                
Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
+                
Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size()
 >= 1);
+                
systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener);
+            }
+        });
+
+        for (int i = 0; i < 100; i++) {
+            TopicPolicyListener<TopicPolicies> listener = new 
TopicPolicyListenerImpl();
+            systemTopicBasedTopicPoliciesService.registerListener(topicName, 
listener);
+            
Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
+            
Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size()
 >= 1);
+            systemTopicBasedTopicPoliciesService.unregisterListener(topicName, 
listener);
+        }
+
+        f.get();
+        //Some system topics will be added to the listeners. Just check if it 
contains topicName.
+        
Assert.assertFalse(systemTopicBasedTopicPoliciesService.listeners.containsKey(topicName));
+    }
+
+    @Test
     public void testGetPolicy() throws ExecutionException, 
InterruptedException, TopicPoliciesCacheNotInitException {
 
         // Init topic policies
@@ -239,7 +272,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         admin.lookups().lookupTopic(TOPIC4.toString());
         admin.lookups().lookupTopic(TOPIC5.toString());
         admin.lookups().lookupTopic(TOPIC6.toString());
-        systemTopicFactory = new 
NamespaceEventsSystemTopicFactory(pulsarClient);
         systemTopicBasedTopicPoliciesService = 
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
     }
 

Reply via email to