This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 96d77f7e1d5 [fix][broker] Avoid execute prepareInitPoliciesCacheAsync 
if namespace is deleted (#22268)
96d77f7e1d5 is described below

commit 96d77f7e1d5b9c56070eaed5c31213a8144871d3
Author: hanmz <[email protected]>
AuthorDate: Mon Mar 18 06:45:02 2024 +0800

    [fix][broker] Avoid execute prepareInitPoliciesCacheAsync if namespace is 
deleted (#22268)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 66 +++++++++++++---------
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 19 +++++++
 2 files changed, 58 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 71f78e21f93..4e9e875bcf4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -324,34 +324,46 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
     }
 
-    private @Nonnull CompletableFuture<Void> 
prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
+    @VisibleForTesting
+    @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull 
NamespaceName namespace) {
         requireNonNull(namespace);
-        return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
-            final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
-                    createSystemTopicClient(namespace);
-            readerCaches.put(namespace, readerCompletableFuture);
-            ownedBundlesCountPerNamespace.putIfAbsent(namespace, new 
AtomicInteger(1));
-            final CompletableFuture<Void> initFuture = readerCompletableFuture
-                    .thenCompose(reader -> {
-                        final CompletableFuture<Void> stageFuture = new 
CompletableFuture<>();
-                        initPolicesCache(reader, stageFuture);
-                        return stageFuture
-                                // Read policies in background
-                                .thenAccept(__ -> 
readMorePoliciesAsync(reader));
-                    });
-            initFuture.exceptionally(ex -> {
-                try {
-                    log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
-                    cleanCacheAndCloseReader(namespace, false);
-                } catch (Throwable cleanupEx) {
-                    // Adding this catch to avoid break callback chain
-                    log.error("[{}] Failed to cleanup reader on 
__change_events topic", namespace, cleanupEx);
-                }
-                return null;
-            });
-            // let caller know we've got an exception.
-            return initFuture;
-        });
+        return 
pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
+                        .thenCompose(namespacePolicies -> {
+                            if (namespacePolicies.isEmpty() || 
namespacePolicies.get().deleted) {
+                                log.info("[{}] skip prepare init policies 
cache since the namespace is deleted",
+                                        namespace);
+                                return CompletableFuture.completedFuture(null);
+                            }
+
+                            return 
policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
+                                final 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
+                                        createSystemTopicClient(namespace);
+                                readerCaches.put(namespace, 
readerCompletableFuture);
+                                
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
+                                final CompletableFuture<Void> initFuture = 
readerCompletableFuture
+                                        .thenCompose(reader -> {
+                                            final CompletableFuture<Void> 
stageFuture = new CompletableFuture<>();
+                                            initPolicesCache(reader, 
stageFuture);
+                                            return stageFuture
+                                                    // Read policies in 
background
+                                                    .thenAccept(__ -> 
readMorePoliciesAsync(reader));
+                                        });
+                                initFuture.exceptionally(ex -> {
+                                    try {
+                                        log.error("[{}] Failed to create 
reader on __change_events topic",
+                                                namespace, ex);
+                                        cleanCacheAndCloseReader(namespace, 
false);
+                                    } catch (Throwable cleanupEx) {
+                                        // Adding this catch to avoid break 
callback chain
+                                        log.error("[{}] Failed to cleanup 
reader on __change_events topic",
+                                                namespace, cleanupEx);
+                                    }
+                                    return null;
+                                });
+                                // let caller know we've got an exception.
+                                return initFuture;
+                            });
+                        });
     }
 
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
createSystemTopicClient(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 1b928904274..9a5ac50e5a7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -71,6 +71,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends 
MockedPulsarServic
 
     private static final String NAMESPACE4 = "system-topic/namespace-4";
 
+    private static final String NAMESPACE5 = "system-topic/namespace-5";
+
     private static final TopicName TOPIC1 = TopicName.get("persistent", 
NamespaceName.get(NAMESPACE1), "topic-1");
     private static final TopicName TOPIC2 = TopicName.get("persistent", 
NamespaceName.get(NAMESPACE1), "topic-2");
     private static final TopicName TOPIC3 = TopicName.get("persistent", 
NamespaceName.get(NAMESPACE2), "topic-1");
@@ -465,4 +467,21 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         admin.namespaces().deleteNamespace(NAMESPACE4);
         
Assert.assertNull(service.getWriterCaches().synchronous().getIfPresent(NamespaceName.get(NAMESPACE4)));
     }
+
+    @Test
+    public void testPrepareInitPoliciesCacheAsyncWhenNamespaceBeingDeleted() 
throws Exception {
+        SystemTopicBasedTopicPoliciesService service = 
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+        admin.namespaces().createNamespace(NAMESPACE5);
+
+        NamespaceName namespaceName = NamespaceName.get(NAMESPACE5);
+        
pulsar.getPulsarResources().getNamespaceResources().setPolicies(namespaceName,
+                old -> {
+                    old.deleted = true;
+                    return old;
+                });
+
+        assertNull(service.getPoliciesCacheInit(namespaceName));
+        service.prepareInitPoliciesCacheAsync(namespaceName).get();
+        admin.namespaces().deleteNamespace(NAMESPACE5);
+    }
 }

Reply via email to