mattisonchao commented on code in PR #21231:
URL: https://github.com/apache/pulsar/pull/21231#discussion_r1335713369


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -265,39 +296,48 @@ public CompletableFuture<TopicPolicies> 
getTopicPoliciesBypassCacheAsync(TopicNa
 
     @Override
     public CompletableFuture<Void> 
addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
         NamespaceName namespace = namespaceBundle.getNamespaceObject();
         if (NamespaceService.isHeartbeatNamespace(namespace)) {
-            result.complete(null);
-            return result;
+            return CompletableFuture.completedFuture(null);
         }
         synchronized (this) {
             if (readerCaches.get(namespace) != null) {
                 ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
-                result.complete(null);
+                return CompletableFuture.completedFuture(null);
             } else {
-                prepareInitPoliciesCache(namespace, result);
+                return prepareInitPoliciesCacheAsync(namespace);
             }
         }
-        return result;
     }
 
-    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, 
CompletableFuture<Void> result) {
-        if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
-            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
+    private @Nonnull CompletableFuture<Void> 
prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
+        requireNonNull(namespace);
+        return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
+            final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
             ownedBundlesCountPerNamespace.putIfAbsent(namespace, new 
AtomicInteger(1));
-            readerCompletableFuture.thenAccept(reader -> {
-                initPolicesCache(reader, result);
-                result.thenRun(() -> readMorePolicies(reader));
-            }).exceptionally(ex -> {
-                log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
-                cleanCacheAndCloseReader(namespace, false);
-                result.completeExceptionally(ex);
+            final CompletableFuture<Void> initFuture = readerCompletableFuture
+                    .thenCompose(reader -> {
+                        final CompletableFuture<Void> stageFuture = new 
CompletableFuture<>();
+                        initPolicesCache(reader, stageFuture);
+                        return stageFuture
+                                // Read policies in background
+                                .thenAccept(__ -> 
readMorePoliciesAsync(reader));
+                    });
+            initFuture.exceptionally(ex -> {
+                try {
+                    log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
+                    cleanCacheAndCloseReader(namespace, false);

Review Comment:
   1. We are using concurrent hashmap with the `computeIfAbsent` method to get 
the future. it should be thread-safe.
   2. When `null` is returned in the compute method, the map will remove the 
corresponding entry/node." [1]
   
   [1] The definition of Map interface. 
https://github.com/openjdk/jdk/blob/0f0c5b2d71e6dec442a5105ba305043cb59b99fc/src/java.base/share/classes/java/util/Map.java#L1166C14-L1166C14
   
   
https://github.com/openjdk/jdk/blob/0f0c5b2d71e6dec442a5105ba305043cb59b99fc/src/java.base/share/classes/java/util/concurrent/ConcurrentHashMap.java#L1945C41-L1952



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to