This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b8e6948f62d [fix][broker] Broker failed to load v1 namespace resources 
cache (#20783)
b8e6948f62d is described below

commit b8e6948f62d6ec2ca53b6a85fe2fd07d4dee6853
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Fri Jul 21 15:41:50 2023 -0700

    [fix][broker] Broker failed to load v1 namespace resources cache (#20783)
---
 .../pulsar/broker/resources/BaseResources.java     | 36 ++++++++++++++++++++++
 .../broker/resources/NamespaceResources.java       |  2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 14 +++++++++
 3 files changed, 51 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 42add4271f6..4011a482075 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -20,11 +20,16 @@ package org.apache.pulsar.broker.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -78,6 +83,37 @@ public class BaseResources<T> {
         return cache.getChildren(path);
     }
 
+    protected CompletableFuture<List<String>> getChildrenRecursiveAsync(String 
path) {
+        Set<String> children = ConcurrentHashMap.newKeySet();
+        CompletableFuture<List<String>> result = new CompletableFuture<>();
+        getChildrenRecursiveAsync(path, children, result, new 
AtomicInteger(1), path);
+        return result;
+    }
+
+    private void getChildrenRecursiveAsync(String path, Set<String> children, 
CompletableFuture<List<String>> result,
+            AtomicInteger totalResults, String parent) {
+        cache.getChildren(path).thenAccept(childList -> {
+            childList = childList != null ? childList : 
Collections.emptyList();
+            if (totalResults.decrementAndGet() == 0 && childList.isEmpty()) {
+                result.complete(new ArrayList<>(children));
+                return;
+            }
+            if (childList.isEmpty()) {
+                return;
+            }
+            // remove current node from children if current node is not leaf
+            children.remove(parent);
+            // childPrefix creates a path hierarchy if children has multi 
level path
+            String childPrefix = path.equals(parent) ? "" : parent + "/";
+            totalResults.addAndGet(childList.size());
+            for (String child : childList) {
+                children.add(childPrefix + child);
+                String childPath = path + "/" + child;
+                getChildrenRecursiveAsync(childPath, children, result, 
totalResults, child);
+            }
+        });
+    }
+
     protected Optional<T> get(String path) throws MetadataStoreException {
         try {
             return getAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 48f82596567..e5dd13c32eb 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -62,7 +62,7 @@ public class NamespaceResources extends 
BaseResources<Policies> {
     }
 
     public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
-        return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant));
+        return getChildrenRecursiveAsync(joinPath(BASE_POLICIES_PATH, tenant));
     }
 
     public CompletableFuture<Boolean> getPoliciesReadOnlyAsync() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 83ed63bf0d9..a4f6bd4650f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -1700,4 +1701,17 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         // verify we only call getReplicatedSubscriptionStatusAsync once.
         verify(topics, times(1)).getReplicatedSubscriptionStatusAsync(any(), 
any());
     }
+
+    @Test
+    public void testNamespaceResources() throws Exception {
+        String ns1V1 = "test/" + testNamespace + "v1";
+        String ns1V2 = testNamespace + "v2";
+        admin.namespaces().createNamespace(testTenant+"/"+ns1V1);
+        admin.namespaces().createNamespace(testTenant+"/"+ns1V2);
+
+        List<String> namespaces = 
pulsar.getPulsarResources().getNamespaceResources().listNamespacesAsync(testTenant)
+                .get();
+        assertTrue(namespaces.contains(ns1V2));
+        assertTrue(namespaces.contains(ns1V1));
+    }
 }

Reply via email to