This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0edcaa0  [pulsar-broker] cluster resources use metadata-store api 
(#9338)
0edcaa0 is described below

commit 0edcaa09150521a2a7e189de43d004ed799db2ee
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Thu Jan 28 12:20:10 2021 -0800

    [pulsar-broker] cluster resources use metadata-store api (#9338)
    
    * [pulsar-broker] Make tenant rest-api async and use metadata-store api
    
    fix tests
    
    fix intermittent test failure
    
    * [pulsar-broker] cluster resources use metadata-store api
    
    * fix test
---
 .../org/apache/pulsar/broker/PulsarService.java    |  18 +-
 .../apache/pulsar/broker/admin/AdminResource.java  |   6 +-
 .../pulsar/broker/admin/impl/BaseResources.java    | 152 ++++++++++
 .../pulsar/broker/admin/impl/ClusterResources.java |  51 ++++
 .../pulsar/broker/admin/impl/ClustersBase.java     | 204 +++++---------
 .../broker/admin/impl/NamespaceResources.java      |  51 ++++
 .../pulsar/broker/admin/impl/PulsarResources.java  |  37 +++
 .../pulsar/broker/admin/impl/TenantResources.java  |  28 ++
 .../pulsar/broker/admin/impl/TenantsBase.java      | 307 ++++++++++++---------
 .../pulsar/broker/web/PulsarWebResource.java       | 157 ++++++++++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   8 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  | 238 +++++++++++-----
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   1 +
 .../OwnerShipForCurrentServerTestBase.java         |   2 +-
 .../pulsar/broker/service/BrokerServiceTest.java   |   8 +-
 .../broker/transaction/TransactionTestBase.java    |   2 +-
 .../apache/pulsar/broker/web/WebServiceTest.java   |  23 +-
 .../metadata/api/MetadataStoreException.java       |   4 +
 .../metadata/cache/impl/MetadataCacheImpl.java     |  10 +-
 .../metadata/impl/AbstractMetadataStore.java       |   7 +
 .../MockedZooKeeperClientFactoryImpl.java          |   2 +-
 21 files changed, 958 insertions(+), 358 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a017d28..3e03297 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -69,6 +69,7 @@ import 
org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.ZookeeperSessionExpiredHandlers;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.admin.impl.PulsarResources;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -220,6 +221,8 @@ public class PulsarService implements AutoCloseable {
     private MetadataStoreExtended localMetadataStore;
     private CoordinationService coordinationService;
 
+    private MetadataStoreExtended configurationMetadataStore;
+    private PulsarResources pulsarResources;
 
     public enum State {
         Init, Started, Closed
@@ -280,6 +283,14 @@ public class PulsarService implements AutoCloseable {
                 new DefaultThreadFactory("zk-cache-callback"));
         }
 
+    public MetadataStoreExtended createConfigurationMetadataStore() throws 
MetadataStoreException {
+        return 
MetadataStoreExtended.create(config.getConfigurationStoreServers(),
+                MetadataStoreConfig.builder()
+                        .sessionTimeoutMillis((int) 
config.getZooKeeperSessionTimeoutMillis())
+                        .allowReadOnlyOperations(false)
+                        .build());
+    }
+
     /**
      * Close the current pulsar service. All resources are released.
      */
@@ -396,6 +407,9 @@ public class PulsarService implements AutoCloseable {
             if (localMetadataStore != null) {
                 localMetadataStore.close();
             }
+            if (configurationMetadataStore != null) {
+                configurationMetadataStore.close();
+            }
 
             state = State.Closed;
             isClosedCondition.signalAll();
@@ -467,9 +481,11 @@ public class PulsarService implements AutoCloseable {
             }
 
             localMetadataStore = createLocalMetadataStore();
-
             coordinationService = new 
CoordinationServiceImpl(localMetadataStore);
 
+            configurationMetadataStore = createConfigurationMetadataStore();
+            pulsarResources = new PulsarResources(configurationMetadataStore);
+
             orderedExecutor = OrderedExecutor.newBuilder()
                     .numThreads(config.getNumOrderedExecutorThreads())
                     .name("pulsar-ordered")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1b98ba1..1728c88 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -89,7 +89,7 @@ import org.slf4j.LoggerFactory;
 
 public abstract class AdminResource extends PulsarWebResource {
     private static final Logger log = 
LoggerFactory.getLogger(AdminResource.class);
-    private static final String POLICIES_READONLY_FLAG_PATH = 
"/admin/flags/policies-readonly";
+    public static final String POLICIES_READONLY_FLAG_PATH = 
"/admin/flags/policies-readonly";
     public static final String PARTITIONED_TOPIC_PATH_ZNODE = 
"partitioned-topics";
     private static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers";
 
@@ -169,7 +169,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
     // This is a stub method for Mockito
     @Override
-    protected void validateSuperUserAccess() {
+    public void validateSuperUserAccess() {
         super.validateSuperUserAccess();
     }
 
@@ -740,7 +740,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
    protected void validateClusterExists(String cluster) {
         try {
-            if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
+            if (!clusterResources().get(path("clusters", 
cluster)).isPresent()) {
                 throw new RestException(Status.PRECONDITION_FAILED, "Cluster " 
+ cluster + " does not exist.");
             }
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
new file mode 100644
index 0000000..07cd9c4
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import lombok.Getter;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+/**
+ * Base class for all configuration resources to access configurations from 
metadata-store.
+ *
+ * @param <T>
+ *            type of configuration-resources.
+ */
+public class BaseResources<T> {
+
+    @Getter
+    private final MetadataStoreExtended store;
+    @Getter
+    private final MetadataCache<T> cache;
+
+    public BaseResources(MetadataStoreExtended store, Class<T> clazz) {
+        this.store = store;
+        this.cache = store.getMetadataCache(clazz);
+    }
+
+    public BaseResources(MetadataStoreExtended store, TypeReference<T> 
typeRef) {
+        this.store = store;
+        this.cache = store.getMetadataCache(typeRef);
+    }
+
+    public List<String> getChildren(String path) throws MetadataStoreException 
{
+        try {
+            return getChildrenAsync(path).get();
+        } catch (ExecutionException e) {
+            throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
+                    : new MetadataStoreException(e.getCause());
+        } catch (Exception e) {
+            throw new MetadataStoreException("Failed to get childeren of " + 
path, e);
+        }
+    }
+
+    public CompletableFuture<List<String>> getChildrenAsync(String path) {
+        return cache.getChildren(path);
+    }
+
+    public Optional<T> get(String path) throws MetadataStoreException {
+        try {
+            return getAsync(path).get();
+        } catch (ExecutionException e) {
+            throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
+                    : new MetadataStoreException(e.getCause());
+        } catch (Exception e) {
+            throw new MetadataStoreException("Failed to get data from " + 
path, e);
+        }
+    }
+
+    public CompletableFuture<Optional<T>> getAsync(String path) {
+        return cache.get(path);
+    }
+
+    public void set(String path, Function<T, T> modifyFunction) throws 
MetadataStoreException {
+        try {
+            setAsync(path, modifyFunction).get();
+        } catch (ExecutionException e) {
+            throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
+                    : new MetadataStoreException(e.getCause());
+        } catch (Exception e) {
+            throw new MetadataStoreException("Failed to set data for " + path, 
e);
+        }
+    }
+
+    public CompletableFuture<Void> setAsync(String path, Function<T, T> 
modifyFunction) {
+        return cache.readModifyUpdate(path, modifyFunction);
+    }
+
+    public void create(String path, T data) throws MetadataStoreException {
+        create(path, t -> data);
+    }
+
+    public void create(String path, Function<Optional<T>, T> createFunction) 
throws MetadataStoreException {
+        try {
+            createAsync(path, createFunction).get();
+        } catch (ExecutionException e) {
+            throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
+                    : new MetadataStoreException(e.getCause());
+        } catch (Exception e) {
+            throw new MetadataStoreException("Failed to create " + path, e);
+        }
+    }
+
+    public CompletableFuture<Void> createAsync(String path, T data) {
+        return createAsync(path, t -> data);
+    }
+
+    public CompletableFuture<Void> createAsync(String path, 
Function<Optional<T>, T> createFunction) {
+        return cache.readModifyUpdateOrCreate(path, createFunction);
+    }
+
+    public void delete(String path) throws MetadataStoreException {
+        try {
+            deleteAsync(path).get();
+        } catch (ExecutionException e) {
+            throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
+                    : new MetadataStoreException(e.getCause());
+        } catch (Exception e) {
+            throw new MetadataStoreException("Failed to delete " + path, e);
+        }
+    }
+
+    public CompletableFuture<Void> deleteAsync(String path) {
+        return cache.delete(path);
+    }
+
+    public boolean exists(String path) throws MetadataStoreException {
+        try {
+            return existsAsync(path).get();
+        } catch (ExecutionException e) {
+            throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
+                    : new MetadataStoreException(e.getCause());
+        } catch (Exception e) {
+            throw new MetadataStoreException("Failed to check exist " + path, 
e);
+        }
+    }
+
+    public CompletableFuture<Boolean> existsAsync(String path) {
+        return cache.exists(path);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
new file mode 100644
index 0000000..d580f2e
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Getter;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+public class ClusterResources extends BaseResources<ClusterData> {
+
+    public static final String CLUSTERS_ROOT = "/admin/clusters";
+    @Getter
+    private FailureDomainResources failureDomainResources;
+
+    public ClusterResources(MetadataStoreExtended store) {
+        super(store, ClusterData.class);
+        this.failureDomainResources = new FailureDomainResources(store, 
FailureDomain.class);
+    }
+
+    public Set<String> list() throws MetadataStoreException {
+        return new HashSet<>(super.getChildren(CLUSTERS_ROOT));
+    }
+
+    public static class FailureDomainResources extends 
BaseResources<FailureDomain> {
+        public static final String FAILURE_DOMAIN = "failureDomain";
+
+        public FailureDomainResources(MetadataStoreExtended store, 
Class<FailureDomain> clazz) {
+            super(store, clazz);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index d2714ec..b483114 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -20,8 +20,6 @@ package org.apache.pulsar.broker.admin.impl;
 
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static 
org.apache.pulsar.broker.namespace.NamespaceService.NAMESPACE_ISOLATION_POLICIES;
-import com.fasterxml.jackson.core.JsonGenerationException;
-import com.fasterxml.jackson.databind.JsonMappingException;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiOperation;
@@ -30,7 +28,6 @@ import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Example;
 import io.swagger.annotations.ExampleProperty;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -51,9 +48,10 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.admin.AdminResource;
+import 
org.apache.pulsar.broker.admin.impl.ClusterResources.FailureDomainResources;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.common.naming.Constants;
@@ -66,14 +64,11 @@ import 
org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ClustersBase extends AdminResource {
+public class ClustersBase extends PulsarWebResource {
 
     @GET
     @ApiOperation(
@@ -87,7 +82,7 @@ public class ClustersBase extends AdminResource {
     public Set<String> getClusters() throws Exception {
         try {
             // Remove "global" cluster from returned list
-            Set<String> clusters = clustersListCache().get().stream()
+            Set<String> clusters = clusterResources().list().stream()
                     .filter(cluster -> 
!Constants.GLOBAL_CLUSTER.equals(cluster)).collect(Collectors.toSet());
             return clusters;
         } catch (Exception e) {
@@ -119,7 +114,7 @@ public class ClustersBase extends AdminResource {
         validateSuperUserAccess();
 
         try {
-            return clustersCache().get(path("clusters", cluster))
+            return clusterResources().get(path("clusters", cluster))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Cluster does not exist"));
         } catch (Exception e) {
             log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, 
e);
@@ -171,11 +166,12 @@ public class ClustersBase extends AdminResource {
 
         try {
             NamedEntity.checkName(cluster);
-            zkCreate(path("clusters", cluster), 
jsonMapper().writeValueAsBytes(clusterData));
+            if (clusterResources().get(path("clusters", cluster)).isPresent()) 
{
+                log.warn("[{}] Failed to create already existing cluster {}", 
clientAppId(), cluster);
+                throw new RestException(Status.CONFLICT, "Cluster already 
exists");
+            }
+            clusterResources().create(path("clusters", cluster), clusterData);
             log.info("[{}] Created cluster {}", clientAppId(), cluster);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing cluster {}", 
clientAppId(), cluster);
-            throw new RestException(Status.CONFLICT, "Cluster already exists");
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Failed to create cluster with invalid name {}", 
clientAppId(), cluster, e);
             throw new RestException(Status.PRECONDITION_FAILED, "Cluster name 
is not valid");
@@ -222,23 +218,12 @@ public class ClustersBase extends AdminResource {
         validatePoliciesReadOnlyAccess();
 
         try {
-            String clusterPath = path("clusters", cluster);
-            Stat nodeStat = new Stat();
-            byte[] content = globalZk().getData(clusterPath, null, nodeStat);
-            ClusterData currentClusterData = null;
-            if (content.length > 0) {
-                currentClusterData = jsonMapper().readValue(content, 
ClusterData.class);
-                // only update cluster-url-data and not overwrite other 
metadata such as peerClusterNames
-                currentClusterData.update(clusterData);
-            } else {
-                currentClusterData = clusterData;
-            }
-            // Write back the new updated ClusterData into zookeeper
-            globalZk().setData(clusterPath, 
jsonMapper().writeValueAsBytes(currentClusterData),
-                    nodeStat.getVersion());
-            globalZkCache().invalidate(clusterPath);
+            clusterResources().set(path("clusters", cluster), old -> {
+                old.update(clusterData);
+                return old;
+            });
             log.info("[{}] Updated cluster {}", clientAppId(), cluster);
-        } catch (KeeperException.NoNodeException e) {
+        } catch (NotFoundException e) {
             log.warn("[{}] Failed to update cluster {}: Does not exist", 
clientAppId(), cluster);
             throw new RestException(Status.NOT_FOUND, "Cluster does not 
exist");
         } catch (Exception e) {
@@ -292,7 +277,7 @@ public class ClustersBase extends AdminResource {
                         throw new RestException(Status.PRECONDITION_FAILED,
                                 cluster + " itself can't be part of 
peer-list");
                     }
-                    clustersCache().get(path("clusters", peerCluster))
+                    clusterResources().get(path("clusters", peerCluster))
                             .orElseThrow(() -> new 
RestException(Status.PRECONDITION_FAILED,
                                     "Peer cluster " + peerCluster + " does not 
exist"));
                 } catch (RestException e) {
@@ -308,16 +293,12 @@ public class ClustersBase extends AdminResource {
         }
 
         try {
-            String clusterPath = path("clusters", cluster);
-            Stat nodeStat = new Stat();
-            byte[] content = globalZk().getData(clusterPath, null, nodeStat);
-            ClusterData currentClusterData = jsonMapper().readValue(content, 
ClusterData.class);
-            currentClusterData.setPeerClusterNames(peerClusterNames);
-            // Write back the new updated ClusterData into zookeeper
-            globalZk().setData(clusterPath, 
jsonMapper().writeValueAsBytes(currentClusterData), nodeStat.getVersion());
-            globalZkCache().invalidate(clusterPath);
+            clusterResources().set(path("clusters", cluster), old -> {
+                old.setPeerClusterNames(peerClusterNames);
+                return old;
+            });
             log.info("[{}] Successfully added peer-cluster {} for {}", 
clientAppId(), peerClusterNames, cluster);
-        } catch (KeeperException.NoNodeException e) {
+        } catch (NotFoundException e) {
             log.warn("[{}] Failed to update cluster {}: Does not exist", 
clientAppId(), cluster);
             throw new RestException(Status.NOT_FOUND, "Cluster does not 
exist");
         } catch (Exception e) {
@@ -347,15 +328,10 @@ public class ClustersBase extends AdminResource {
             @PathParam("cluster") String cluster
     ) {
         validateSuperUserAccess();
-
         try {
-            String clusterPath = path("clusters", cluster);
-            byte[] content = globalZk().getData(clusterPath, null, null);
-            ClusterData clusterData = jsonMapper().readValue(content, 
ClusterData.class);
+            ClusterData clusterData = clusterResources().get(path("clusters", 
cluster))
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Cluster does not exist"));
             return clusterData.getPeerClusterNames();
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to get cluster {}: Does not exist", 
clientAppId(), cluster);
-            throw new RestException(Status.NOT_FOUND, "Cluster does not 
exist");
         } catch (Exception e) {
             log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, 
e);
             throw new RestException(e);
@@ -388,12 +364,12 @@ public class ClustersBase extends AdminResource {
         // Check that the cluster is not used by any property (eg: no 
namespaces provisioned there)
         boolean isClusterUsed = false;
         try {
-            for (String property : globalZk().getChildren(path(POLICIES), 
false)) {
-                if (globalZk().exists(path(POLICIES, property, cluster), 
false) == null) {
+            for (String property : 
tenantResources().getChildren(path(POLICIES))) {
+                if (!clusterResources().exists(path(POLICIES, property, 
cluster))) {
                     continue;
                 }
 
-                if (!globalZk().getChildren(path(POLICIES, property, cluster), 
false).isEmpty()) {
+                if (!clusterResources().getChildren(path(POLICIES, property, 
cluster)).isEmpty()) {
                     // We found a property that has at least a namespace in 
this cluster
                     isClusterUsed = true;
                     break;
@@ -402,13 +378,12 @@ public class ClustersBase extends AdminResource {
 
             // check the namespaceIsolationPolicies associated with the cluster
             String path = path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES);
-            Optional<NamespaceIsolationPolicies> nsIsolationPolicies = 
namespaceIsolationPoliciesCache().get(path);
+            Optional<NamespaceIsolationPolicies> nsIsolationPolicies = 
namespaceIsolationPolicies().getPolicies(path);
 
             // Need to delete the isolation policies if present
             if (nsIsolationPolicies.isPresent()) {
                 if (nsIsolationPolicies.get().getPolicies().isEmpty()) {
-                    globalZk().delete(path, -1);
-                    namespaceIsolationPoliciesCache().invalidate(path);
+                    namespaceIsolationPolicies().delete(path);
                 } else {
                     isClusterUsed = true;
                 }
@@ -426,10 +401,9 @@ public class ClustersBase extends AdminResource {
         try {
             String clusterPath = path("clusters", cluster);
             deleteFailureDomain(clusterPath);
-            globalZk().delete(clusterPath, -1);
-            globalZkCache().invalidate(clusterPath);
+            clusterResources().delete(clusterPath);
             log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
-        } catch (KeeperException.NoNodeException e) {
+        } catch (NotFoundException e) {
             log.warn("[{}] Failed to delete cluster {} - Does not exist", 
clientAppId(), cluster);
             throw new RestException(Status.NOT_FOUND, "Cluster does not 
exist");
         } catch (Exception e) {
@@ -441,16 +415,14 @@ public class ClustersBase extends AdminResource {
     private void deleteFailureDomain(String clusterPath) {
         try {
             String failureDomain = joinPath(clusterPath, 
ConfigurationCacheService.FAILURE_DOMAIN);
-            if (globalZk().exists(failureDomain, false) == null) {
+            if (!clusterResources().exists(failureDomain)) {
                 return;
             }
-            for (String domain : globalZk().getChildren(failureDomain, false)) 
{
+            for (String domain : 
clusterResources().getChildren(failureDomain)) {
                 String domainPath = joinPath(failureDomain, domain);
-                globalZk().delete(domainPath, -1);
+                clusterResources().delete(domainPath);
             }
-            globalZk().delete(failureDomain, -1);
-            failureDomainCache().clear();
-            failureDomainListCache().clear();
+            clusterResources().delete(failureDomain);
         } catch (Exception e) {
             log.warn("Failed to delete failure-domain under cluster {}", 
clusterPath);
             throw new RestException(e);
@@ -478,13 +450,13 @@ public class ClustersBase extends AdminResource {
         @PathParam("cluster") String cluster
     ) throws Exception {
         validateSuperUserAccess();
-        if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
+        if (!clusterResources().exists(path("clusters", cluster))) {
             throw new RestException(Status.NOT_FOUND, "Cluster " + cluster + " 
does not exist.");
         }
 
         try {
-            NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPoliciesCache()
-                    .get(path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES))
+            NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPolicies()
+                    .getPolicies(path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND,
                             "NamespaceIsolationPolicies for cluster " + 
cluster + " does not exist"));
             // construct the response to Namespace isolation data map
@@ -524,8 +496,8 @@ public class ClustersBase extends AdminResource {
         validateClusterExists(cluster);
 
         try {
-            NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPoliciesCache()
-                    .get(path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES))
+            NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPolicies()
+                    .getPolicies(path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND,
                             "NamespaceIsolationPolicies for cluster " + 
cluster + " does not exist"));
             // construct the response to Namespace isolation data map
@@ -577,8 +549,8 @@ public class ClustersBase extends AdminResource {
             throw new RestException(e);
         }
         try {
-            Optional<NamespaceIsolationPolicies> nsPoliciesResult = 
namespaceIsolationPoliciesCache()
-                    .get(nsIsolationPoliciesPath);
+            Optional<NamespaceIsolationPolicies> nsPoliciesResult = 
namespaceIsolationPolicies()
+                    .getPolicies(nsIsolationPoliciesPath);
             if (!nsPoliciesResult.isPresent()) {
                 throw new RestException(Status.NOT_FOUND, "namespace-isolation 
policies not found for " + cluster);
             }
@@ -639,8 +611,8 @@ public class ClustersBase extends AdminResource {
         final String nsIsolationPoliciesPath = AdminResource.path("clusters", 
cluster, NAMESPACE_ISOLATION_POLICIES);
         Map<String, NamespaceIsolationData> nsPolicies;
         try {
-            Optional<NamespaceIsolationPolicies> nsPoliciesResult = 
namespaceIsolationPoliciesCache()
-                    .get(nsIsolationPoliciesPath);
+            Optional<NamespaceIsolationPolicies> nsPoliciesResult = 
namespaceIsolationPolicies()
+                    .getPolicies(nsIsolationPoliciesPath);
             if (!nsPoliciesResult.isPresent()) {
                 throw new RestException(Status.NOT_FOUND, "namespace-isolation 
policies not found for " + cluster);
             }
@@ -710,21 +682,18 @@ public class ClustersBase extends AdminResource {
             jsonInput = 
ObjectMapperFactory.create().writeValueAsString(policyData);
 
             String nsIsolationPolicyPath = path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES);
-            NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPoliciesCache()
-                    .get(nsIsolationPolicyPath).orElseGet(() -> {
+            NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPolicies()
+                    .getPolicies(nsIsolationPolicyPath).orElseGet(() -> {
                         try {
-                            this.createZnodeIfNotExist(nsIsolationPolicyPath, 
Optional.of(Collections.emptyMap()));
+                            
namespaceIsolationPolicies().create(nsIsolationPolicyPath, 
Collections.emptyMap());
                             return new NamespaceIsolationPolicies();
-                        } catch (KeeperException | InterruptedException e) {
+                        } catch (Exception e) {
                             throw new RestException(e);
                         }
                     });
 
             nsIsolationPolicies.setPolicy(policyName, policyData);
-            globalZk().setData(nsIsolationPolicyPath, 
jsonMapper().writeValueAsBytes(nsIsolationPolicies.getPolicies()),
-                    -1);
-            // make sure that the cache content will be refreshed for the next 
read access
-            
namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
+            namespaceIsolationPolicies().set(nsIsolationPolicyPath, old -> 
nsIsolationPolicies.getPolicies());
 
             // whether or not make the isolation update on time.
             if 
(pulsar().getConfiguration().isEnableNamespaceIsolationUpdateOnTime()) {
@@ -738,7 +707,7 @@ public class ClustersBase extends AdminResource {
                     clientAppId(), cluster, policyName, iae);
             asyncResponse.resume(new RestException(Status.BAD_REQUEST,
                     "Invalid format of input policy data. policy: " + 
policyName + "; data: " + jsonInput));
-        } catch (KeeperException.NoNodeException nne) {
+        } catch (NotFoundException nne) {
             log.warn("[{}] Failed to update 
clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
                     cluster);
             asyncResponse.resume(new RestException(Status.NOT_FOUND,
@@ -832,29 +801,6 @@ public class ClustersBase extends AdminResource {
         });
     }
 
-    private boolean createZnodeIfNotExist(String path, Optional<Object> value)
-            throws KeeperException, InterruptedException {
-        // create persistent node on ZooKeeper
-        if (globalZk().exists(path, false) == null) {
-            // create all the intermediate nodes
-            try {
-                ZkUtils.createFullPathOptimistic(globalZk(), path,
-                        value.isPresent() ? 
jsonMapper().writeValueAsBytes(value.get()) : null, Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.PERSISTENT);
-                return true;
-            } catch (KeeperException.NodeExistsException nee) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Other broker preempted the full path [{}] 
already. Continue...", path);
-                }
-            } catch (JsonGenerationException e) {
-                // ignore json error as it is empty hash
-            } catch (JsonMappingException e) {
-            } catch (IOException e) {
-            }
-        }
-        return false;
-    }
-
     @DELETE
     @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
     @ApiOperation(
@@ -886,22 +832,19 @@ public class ClustersBase extends AdminResource {
         try {
 
             String nsIsolationPolicyPath = path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES);
-            NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPoliciesCache()
-                    .get(nsIsolationPolicyPath).orElseGet(() -> {
+            NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPolicies()
+                    .getPolicies(nsIsolationPolicyPath).orElseGet(() -> {
                         try {
-                            this.createZnodeIfNotExist(nsIsolationPolicyPath, 
Optional.of(Collections.emptyMap()));
+                            
namespaceIsolationPolicies().create(nsIsolationPolicyPath, 
Collections.emptyMap());
                             return new NamespaceIsolationPolicies();
-                        } catch (KeeperException | InterruptedException e) {
+                        } catch (Exception e) {
                             throw new RestException(e);
                         }
                     });
 
             nsIsolationPolicies.deletePolicy(policyName);
-            globalZk().setData(nsIsolationPolicyPath, 
jsonMapper().writeValueAsBytes(nsIsolationPolicies.getPolicies()),
-                    -1);
-            // make sure that the cache content will be refreshed for the next 
read access
-            
namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
-        } catch (KeeperException.NoNodeException nne) {
+            namespaceIsolationPolicies().set(nsIsolationPolicyPath, old -> 
nsIsolationPolicies.getPolicies());
+        } catch (NotFoundException nne) {
             log.warn("[{}] Failed to update 
brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
                     cluster);
             throw new RestException(Status.NOT_FOUND,
@@ -949,15 +892,9 @@ public class ClustersBase extends AdminResource {
 
         try {
             String domainPath = 
joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, 
domainName);
-            if (this.createZnodeIfNotExist(domainPath, 
Optional.ofNullable(domain))) {
-                // clear domains-children cache
-                this.failureDomainListCache().clear();
-            } else {
-                globalZk().setData(domainPath, 
jsonMapper().writeValueAsBytes(domain), -1);
-                // make sure that the domain-cache will be refreshed for the 
next read access
-                failureDomainCache().invalidate(domainPath);
-            }
-        } catch (KeeperException.NoNodeException nne) {
+            FailureDomainResources failureDomainListCache = 
clusterResources().getFailureDomainResources();
+            failureDomainListCache.create(domainPath, old -> domain);
+        } catch (NotFoundException nne) {
             log.warn("[{}] Failed to update domain {}. clusters {}  Does not 
exist", clientAppId(), cluster,
                     domainName);
             throw new RestException(Status.NOT_FOUND,
@@ -992,16 +929,17 @@ public class ClustersBase extends AdminResource {
         Map<String, FailureDomain> domains = Maps.newHashMap();
         try {
             final String failureDomainRootPath = 
pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
-            for (String domainName : failureDomainListCache().get()) {
+            FailureDomainResources failureDomainListCache = 
clusterResources().getFailureDomainResources();
+            for (String domainName : 
failureDomainListCache.getChildren(failureDomainRootPath)) {
                 try {
-                    Optional<FailureDomain> domain = failureDomainCache()
+                    Optional<FailureDomain> domain = failureDomainListCache
                             .get(joinPath(failureDomainRootPath, domainName));
                     domain.ifPresent(failureDomain -> domains.put(domainName, 
failureDomain));
                 } catch (Exception e) {
                     log.warn("Failed to get domain {}", domainName, e);
                 }
             }
-        } catch (KeeperException.NoNodeException e) {
+        } catch (NotFoundException e) {
             log.warn("[{}] Failure-domain is not configured for cluster {}", 
clientAppId(), cluster, e);
             return Collections.emptyMap();
         } catch (Exception e) {
@@ -1041,7 +979,7 @@ public class ClustersBase extends AdminResource {
 
         try {
             final String failureDomainRootPath = 
pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
-            return failureDomainCache().get(joinPath(failureDomainRootPath, 
domainName))
+            return 
clusterResources().getFailureDomainResources().get(joinPath(failureDomainRootPath,
 domainName))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND,
                             "Domain " + domainName + " for cluster " + cluster 
+ " does not exist"));
         } catch (RestException re) {
@@ -1082,11 +1020,8 @@ public class ClustersBase extends AdminResource {
         try {
             final String domainPath = 
joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT,
                     domainName);
-            globalZk().delete(domainPath, -1);
-            // clear domain cache
-            failureDomainCache().invalidate(domainPath);
-            failureDomainListCache().clear();
-        } catch (KeeperException.NoNodeException nne) {
+            clusterResources().getFailureDomainResources().delete(domainPath);
+        } catch (NotFoundException nne) {
             log.warn("[{}] Domain {} does not exist in {}", clientAppId(), 
domainName, cluster);
             throw new RestException(Status.NOT_FOUND,
                     "Domain-name " + domainName + " or cluster " + cluster + " 
does not exist");
@@ -1101,13 +1036,14 @@ public class ClustersBase extends AdminResource {
         if (inputDomain != null && inputDomain.brokers != null) {
             try {
                 final String failureDomainRootPath = 
pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
-                for (String domainName : failureDomainListCache().get()) {
+                for (String domainName : 
clusterResources().getFailureDomainResources()
+                        .getChildren(failureDomainRootPath)) {
                     if (inputDomainName.equals(domainName)) {
                         continue;
                     }
                     try {
                         Optional<FailureDomain> domain =
-                                failureDomainCache()
+                                clusterResources().getFailureDomainResources()
                                         .get(joinPath(failureDomainRootPath, 
domainName));
                         if (domain.isPresent() && domain.get().brokers != 
null) {
                             List<String> duplicateBrokers = 
domain.get().brokers.stream().parallel()
@@ -1124,7 +1060,7 @@ public class ClustersBase extends AdminResource {
                         log.warn("Failed to get domain {}", domainName, e);
                     }
                 }
-            } catch (KeeperException.NoNodeException e) {
+            } catch (NotFoundException e) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Domain is not configured for cluster", 
clientAppId(), e);
                 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
new file mode 100644
index 0000000..966d421
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.Map;
+import java.util.Optional;
+import lombok.Getter;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+public class NamespaceResources extends BaseResources<Policies> {
+    @Getter
+    private IsolationPolicyResources isolationPolicies;
+
+    public NamespaceResources(MetadataStoreExtended store) {
+        super(store, Policies.class);
+        isolationPolicies = new IsolationPolicyResources(store);
+    }
+
+    public static class IsolationPolicyResources extends 
BaseResources<Map<String, NamespaceIsolationData>> {
+        public IsolationPolicyResources(MetadataStoreExtended store) {
+            super(store, new TypeReference<Map<String, 
NamespaceIsolationData>>() {
+            });
+        }
+
+        public Optional<NamespaceIsolationPolicies> getPolicies(String path) 
throws MetadataStoreException {
+            Optional<Map<String, NamespaceIsolationData>> data = 
super.get(path);
+            return data.isPresent() ? Optional.of(new 
NamespaceIsolationPolicies(data.get())) : Optional.empty();
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
new file mode 100644
index 0000000..4384762
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+@Getter(AccessLevel.PUBLIC)
+public class PulsarResources {
+
+    private TenantResources tenatResources;
+    private ClusterResources clusterResources;
+    private NamespaceResources namespaceResources;
+
+    public PulsarResources(MetadataStoreExtended configurationMetadataStore) {
+        tenatResources = new TenantResources(configurationMetadataStore);
+        clusterResources = new ClusterResources(configurationMetadataStore);
+        namespaceResources = new 
NamespaceResources(configurationMetadataStore);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
new file mode 100644
index 0000000..1a4fc38
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+public class TenantResources extends BaseResources<TenantInfo> {
+    public TenantResources(MetadataStoreExtended store) {
+        super(store, TenantInfo.class);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
index e1c270d..9c28046 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
@@ -24,8 +24,10 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
@@ -33,35 +35,45 @@ import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TenantsBase extends AdminResource {
+public class TenantsBase extends PulsarWebResource {
+
+    private static final Logger log = 
LoggerFactory.getLogger(TenantsBase.class);
 
     @GET
     @ApiOperation(value = "Get the list of existing tenants.", response = 
String.class, responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
             @ApiResponse(code = 404, message = "Tenant doesn't exist") })
-    public List<String> getTenants() {
-        validateSuperUserAccess();
-
+    public void getTenants(@Suspended final AsyncResponse asyncResponse) {
+        final String clientAppId = clientAppId();
         try {
-            List<String> tenants = globalZk().getChildren(path(POLICIES), 
false);
-            tenants.sort(null);
-            return tenants;
+            validateSuperUserAccess();
         } catch (Exception e) {
-            log.error("[{}] Failed to get tenants list", clientAppId(), e);
-            throw new RestException(e);
+            asyncResponse.resume(e);
+            return;
         }
+        
tenantResources().getChildrenAsync(path(POLICIES)).whenComplete((tenants, e) -> 
{
+            if (e != null) {
+                log.error("[{}] Failed to get tenants list", clientAppId, e);
+                asyncResponse.resume(new RestException(e));
+                return;
+            }
+            tenants.sort(null);
+            asyncResponse.resume(tenants);
+        });
     }
 
     @GET
@@ -69,18 +81,25 @@ public class TenantsBase extends AdminResource {
     @ApiOperation(value = "Get the admin configuration for a given tenant.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
             @ApiResponse(code = 404, message = "Tenant does not exist") })
-    public TenantInfo getTenantAdmin(
-        @ApiParam(value = "The tenant name")
-        @PathParam("tenant") String tenant) {
-        validateSuperUserAccess();
-
+    public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "The tenant name") @PathParam("tenant") String 
tenant) {
+        final String clientAppId = clientAppId();
         try {
-            return tenantsCache().get(path(POLICIES, tenant))
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Tenant does not exist"));
+            validateSuperUserAccess();
         } catch (Exception e) {
-            log.error("[{}] Failed to get tenant {}", clientAppId(), tenant, 
e);
-            throw new RestException(e);
+            asyncResponse.resume(e);
         }
+
+        tenantResources().getAsync(path(POLICIES, 
tenant)).whenComplete((tenantInfo, e) -> {
+            if (e != null) {
+                log.error("[{}] Failed to get Tenant {}", clientAppId, 
e.getMessage());
+                asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get Tenant"));
+                return;
+            }
+            boolean response = tenantInfo.isPresent() ? 
asyncResponse.resume(tenantInfo.get())
+                    : asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Tenant does not exist"));
+            return;
+        });
     }
 
     @PUT
@@ -91,103 +110,113 @@ public class TenantsBase extends AdminResource {
             @ApiResponse(code = 412, message = "Tenant name is not valid"),
             @ApiResponse(code = 412, message = "Clusters can not be empty"),
             @ApiResponse(code = 412, message = "Clusters do not exist") })
-    public void createTenant(
-        @ApiParam(value = "The tenant name")
-        @PathParam("tenant") String tenant,
-        @ApiParam(value = "TenantInfo") TenantInfo config) {
-        validateSuperUserAccess();
-        validatePoliciesReadOnlyAccess();
-        validateClusters(config);
+    public void createTenant(@Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "The tenant name") @PathParam("tenant") String 
tenant,
+            @ApiParam(value = "TenantInfo") TenantInfo tenantInfo) {
 
+        final String clientAppId = clientAppId();
         try {
+            validateSuperUserAccess();
+            validatePoliciesReadOnlyAccess();
+            validateClusters(tenantInfo);
             NamedEntity.checkName(tenant);
+        } catch (IllegalArgumentException e) {
+            log.warn("[{}] Failed to create tenant with invalid name {}", 
clientAppId(), tenant, e);
+            asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, 
"Tenant name is not valid"));
+            return;
+        } catch (Exception e) {
+            asyncResponse.resume(e);
+            return;
+        }
+
+        
tenantResources().getChildrenAsync(path(POLICIES)).whenComplete((tenants, e) -> 
{
+            if (e != null) {
+                log.error("[{}] Failed to create tenant ", clientAppId, 
e.getCause());
+                asyncResponse.resume(new RestException(e));
+                return;
+            }
 
             int maxTenants = pulsar().getConfiguration().getMaxTenants();
-            //Due to the cost of distributed locks, no locks are added here.
-            //In a concurrent scenario, the threshold will be exceeded.
+            // Due to the cost of distributed locks, no locks are added here.
+            // In a concurrent scenario, the threshold will be exceeded.
             if (maxTenants > 0) {
-                List<String> tenants = globalZk().getChildren(path(POLICIES), 
false);
                 if (tenants != null && tenants.size() >= maxTenants) {
-                    throw new RestException(Status.PRECONDITION_FAILED, 
"Exceed the maximum number of tenants");
+                    asyncResponse.resume(
+                            new RestException(Status.PRECONDITION_FAILED, 
"Exceed the maximum number of tenants"));
+                    return;
                 }
             }
-            zkCreate(path(POLICIES, tenant), 
jsonMapper().writeValueAsBytes(config));
-            log.info("[{}] Created tenant {}", clientAppId(), tenant);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing tenant {}", 
clientAppId(), tenant);
-            throw new RestException(Status.CONFLICT, "Tenant already exists");
-        } catch (IllegalArgumentException e) {
-            log.warn("[{}] Failed to create tenant with invalid name {}", 
clientAppId(), tenant, e);
-            throw new RestException(Status.PRECONDITION_FAILED, "Tenant name 
is not valid");
-        } catch (Exception e) {
-            log.error("[{}] Failed to create tenant {}", clientAppId(), 
tenant, e);
-            throw new RestException(e);
-        }
+            tenantResources().existsAsync(path(POLICIES, 
tenant)).thenAccept(exist ->{
+                if (exist) {
+                    asyncResponse.resume(new RestException(Status.CONFLICT, 
"Tenant already exist"));
+                    return;
+                }
+                tenantResources().createAsync(path(POLICIES, tenant), 
tenantInfo).thenAccept((r) -> {
+                    log.info("[{}] Created tenant {}", clientAppId(), tenant);
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(ex -> {
+                    log.error("[{}] Failed to create tenant {}", clientAppId, 
tenant, e);
+                    asyncResponse.resume(new RestException(ex));
+                    return null;
+                });
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to create tenant {}", clientAppId(), 
tenant, e);
+                asyncResponse.resume(new RestException(ex));
+                return null;
+            });
+        });
     }
 
     @POST
     @Path("/{tenant}")
     @ApiOperation(value = "Update the admins for a tenant.",
-            notes = "This operation requires Pulsar super-user privileges.")
+    notes = "This operation requires Pulsar super-user privileges.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
             @ApiResponse(code = 404, message = "Tenant does not exist"),
             @ApiResponse(code = 409, message = "Tenant already exists"),
             @ApiResponse(code = 412, message = "Clusters can not be empty"),
             @ApiResponse(code = 412, message = "Clusters do not exist") })
-    public void updateTenant(
-        @ApiParam(value = "The tenant name")
-        @PathParam("tenant") String tenant,
-        @ApiParam(value = "TenantInfo") TenantInfo newTenantAdmin) {
-        validateSuperUserAccess();
-        validatePoliciesReadOnlyAccess();
-        validateClusters(newTenantAdmin);
-
-        Stat nodeStat = new Stat();
+    public void updateTenant(@Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "The tenant name") @PathParam("tenant") String 
tenant,
+            @ApiParam(value = "TenantInfo") TenantInfo newTenantAdmin) {
         try {
-            byte[] content = globalZk().getData(path(POLICIES, tenant), null, 
nodeStat);
-            TenantInfo oldTenantAdmin = jsonMapper().readValue(content, 
TenantInfo.class);
-            List<String> clustersWithActiveNamespaces = Lists.newArrayList();
-            if (oldTenantAdmin.getAllowedClusters().size() > 
newTenantAdmin.getAllowedClusters().size()) {
-                // Get the colo(s) being removed from the list
-                
oldTenantAdmin.getAllowedClusters().removeAll(newTenantAdmin.getAllowedClusters());
-                log.debug("Following clusters are being removed : [{}]", 
oldTenantAdmin.getAllowedClusters());
-                for (String cluster : oldTenantAdmin.getAllowedClusters()) {
-                    if (Constants.GLOBAL_CLUSTER.equals(cluster)) {
-                        continue;
-                    }
-                    List<String> activeNamespaces = Lists.newArrayList();
-                    try {
-                        activeNamespaces = 
globalZk().getChildren(path(POLICIES, tenant, cluster), false);
-                        if (activeNamespaces.size() != 0) {
-                            // There are active namespaces in this cluster
-                            clustersWithActiveNamespaces.add(cluster);
-                        }
-                    } catch (KeeperException.NoNodeException nne) {
-                        // Fine, some cluster does not have active namespace. 
Move on!
-                    }
-                }
-                if (!clustersWithActiveNamespaces.isEmpty()) {
-                    // Throw an exception because colos being removed are 
having active namespaces
-                    String msg = String.format(
-                            "Failed to update the tenant because active 
namespaces are present in colos %s."
-                                    + " Please delete those namespaces first",
-                            clustersWithActiveNamespaces);
-                    throw new RestException(Status.CONFLICT, msg);
-                }
-            }
-            String tenantPath = path(POLICIES, tenant);
-            globalZk().setData(tenantPath, 
jsonMapper().writeValueAsBytes(newTenantAdmin), -1);
-            globalZkCache().invalidate(tenantPath);
-            log.info("[{}] updated tenant {}", clientAppId(), tenant);
-        } catch (RestException re) {
-            throw re;
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update tenant {}: does not exist", 
clientAppId(), tenant);
-            throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
+            validateSuperUserAccess();
+            validatePoliciesReadOnlyAccess();
+            validateClusters(newTenantAdmin);
         } catch (Exception e) {
-            log.error("[{}] Failed to update tenant {}", clientAppId(), 
tenant, e);
-            throw new RestException(e);
+            asyncResponse.resume(e);
+            return;
         }
+
+        final String clientAddId = clientAppId();
+        tenantResources().getAsync(path(POLICIES, 
tenant)).thenAccept(tenantAdmin -> {
+            if (!tenantAdmin.isPresent()) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Tenant " + tenant + " not found"));
+                return;
+            }
+            TenantInfo oldTenantAdmin = tenantAdmin.get();
+            Set<String> newClusters = new 
HashSet<>(newTenantAdmin.getAllowedClusters());
+            canUpdateCluster(tenant, oldTenantAdmin.getAllowedClusters(), 
newClusters).thenApply(r -> {
+                tenantResources().setAsync(path(POLICIES, tenant), old -> {
+                    return newTenantAdmin;
+                }).thenAccept(done -> {
+                    log.info("Successfully updated tenant info {}", tenant);
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(ex -> {
+                    log.warn("Failed to update tenant {}", tenant, 
ex.getCause());
+                    asyncResponse.resume(new RestException(ex));
+                    return null;
+                });
+                return null;
+            }).exceptionally(nsEx -> {
+                asyncResponse.resume(nsEx.getCause());
+                return null;
+            });
+        }).exceptionally(ex -> {
+            log.error("[{}] Failed to get tenant {}", clientAddId, tenant, 
ex.getCause());
+            asyncResponse.resume(new RestException(ex));
+            return null;
+        });
     }
 
     @DELETE
@@ -196,47 +225,59 @@ public class TenantsBase extends AdminResource {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
             @ApiResponse(code = 404, message = "Tenant does not exist"),
             @ApiResponse(code = 409, message = "The tenant still has active 
namespaces") })
-    public void deleteTenant(
-        @PathParam("tenant")
-        @ApiParam(value = "The tenant name")
-        String tenant) {
-        validateSuperUserAccess();
-        validatePoliciesReadOnlyAccess();
-
-        boolean isTenantEmpty;
+    public void deleteTenant(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") @ApiParam(value = "The tenant name") String 
tenant) {
         try {
-            isTenantEmpty = getListOfNamespaces(tenant).isEmpty();
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to delete tenant {}: does not exist", 
clientAppId(), tenant);
-            throw new RestException(Status.NOT_FOUND, "The tenant does not 
exist");
+            validateSuperUserAccess();
+            validatePoliciesReadOnlyAccess();
         } catch (Exception e) {
-            log.error("[{}] Failed to get tenant status {}", clientAppId(), 
tenant, e);
-            throw new RestException(e);
+            asyncResponse.resume(e);
+            return;
         }
 
-        if (!isTenantEmpty) {
-            log.warn("[{}] Failed to delete tenant {}: not empty", 
clientAppId(), tenant);
-            throw new RestException(Status.CONFLICT, "The tenant still has 
active namespaces");
-        }
-
-        try {
-            // First try to delete every cluster z-node
-            for (String cluster : globalZk().getChildren(path(POLICIES, 
tenant), false)) {
-                globalZk().delete(path(POLICIES, tenant, cluster), -1);
+        tenantResources().existsAsync(path(POLICIES, tenant)).thenApply(exists 
->{
+            if (!exists) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Tenant doesn't exist"));
+                return null;
             }
-
-            globalZk().delete(path(POLICIES, tenant), -1);
-            log.info("[{}] Deleted tenant {}", clientAppId(), tenant);
-        } catch (Exception e) {
-            log.error("[{}] Failed to delete tenant {}", clientAppId(), 
tenant, e);
-            throw new RestException(e);
-        }
+            return hasActiveNamespace(tenant).thenAccept(ns -> {
+                try {
+                    // already fetched children and they should be in the cache
+                    List<CompletableFuture<Void>> clusterList = 
Lists.newArrayList();
+                    for (String cluster : 
tenantResources().getChildrenAsync(path(POLICIES, tenant)).get()) {
+                        
clusterList.add(tenantResources().deleteAsync(path(POLICIES, tenant, cluster)));
+                    }
+                    FutureUtil.waitForAll(clusterList).thenAccept(c -> {
+                        tenantResources().deleteAsync(path(POLICIES, 
tenant)).thenAccept(t -> {
+                            log.info("[{}] Deleted tenant {}", clientAppId(), 
tenant);
+                            asyncResponse.resume(Response.noContent().build());
+                        }).exceptionally(ex -> {
+                            log.error("Failed to delete tenant {}", tenant, 
ex.getCause());
+                            asyncResponse.resume(new RestException(ex));
+                            return null;
+                        });
+                    }).exceptionally(ex -> {
+                        log.error("Failed to delete clusters under tenant {}", 
tenant, ex.getCause());
+                        asyncResponse.resume(new RestException(ex));
+                        return null;
+                    });
+                    log.info("[{}] Deleted tenant {}", clientAppId(), tenant);
+                } catch (Exception e) {
+                    log.error("[{}] Failed to delete tenant {}", 
clientAppId(), tenant, e);
+                    asyncResponse.resume(new RestException(e));
+                }
+            }).exceptionally(ex -> {
+                log.error("Failed to delete tenant due to active namespace 
{}", tenant, ex.getCause());
+                asyncResponse.resume(new RestException(ex));
+                return null;
+            });
+        });
     }
 
     private void validateClusters(TenantInfo info) {
         // empty cluster shouldn't be allowed
-        if (info == null || info.getAllowedClusters().stream()
-                .filter(c -> 
!StringUtils.isBlank(c)).collect(Collectors.toSet()).isEmpty()
+        if (info == null || info.getAllowedClusters().stream().filter(c -> 
!StringUtils.isBlank(c))
+                .collect(Collectors.toSet()).isEmpty()
                 || info.getAllowedClusters().stream().anyMatch(ac -> 
StringUtils.isBlank(ac))) {
             log.warn("[{}] Failed to validate due to clusters are empty", 
clientAppId());
             throw new RestException(Status.PRECONDITION_FAILED, "Clusters can 
not be empty");
@@ -244,11 +285,11 @@ public class TenantsBase extends AdminResource {
 
         List<String> nonexistentClusters;
         try {
-            Set<String> availableClusters = clustersListCache().get();
+            Set<String> availableClusters = clusterResources().list();
             Set<String> allowedClusters = info.getAllowedClusters();
-            nonexistentClusters = allowedClusters.stream()
-                .filter(cluster -> !(availableClusters.contains(cluster) || 
Constants.GLOBAL_CLUSTER.equals(cluster)))
-                .collect(Collectors.toList());
+            nonexistentClusters = allowedClusters.stream().filter(
+                    cluster -> !(availableClusters.contains(cluster) || 
Constants.GLOBAL_CLUSTER.equals(cluster)))
+                    .collect(Collectors.toList());
         } catch (Exception e) {
             log.error("[{}] Failed to get available clusters", clientAppId(), 
e);
             throw new RestException(e);
@@ -258,6 +299,4 @@ public class TenantsBase extends AdminResource {
             throw new RestException(Status.PRECONDITION_FAILED, "Clusters do 
not exist");
         }
     }
-
-    private static final Logger log = 
LoggerFactory.getLogger(TenantsBase.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index f10431c..1a307a8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -21,12 +21,16 @@ package org.apache.pulsar.broker.web;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static 
org.apache.pulsar.broker.admin.AdminResource.POLICIES_READONLY_FLAG_PATH;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -44,6 +48,10 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.admin.impl.ClusterResources;
+import org.apache.pulsar.broker.admin.impl.NamespaceResources;
+import 
org.apache.pulsar.broker.admin.impl.NamespaceResources.IsolationPolicyResources;
+import org.apache.pulsar.broker.admin.impl.TenantResources;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
@@ -65,7 +73,9 @@ import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.path.PolicyPath;
-import org.apache.zookeeper.KeeperException;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -169,7 +179,7 @@ public abstract class PulsarWebResource {
      * @throws WebApplicationException
      *             if not authorized
      */
-    protected void validateSuperUserAccess() {
+    public void validateSuperUserAccess() {
         if (config().isAuthenticationEnabled()) {
             String appId = clientAppId();
             if (log.isDebugEnabled()) {
@@ -245,15 +255,8 @@ public abstract class PulsarWebResource {
                     (isClientAuthenticated(clientAppId)), clientAppId);
         }
 
-        TenantInfo tenantInfo;
-
-        try {
-            tenantInfo = 
pulsar.getConfigurationCache().propertiesCache().get(path(POLICIES, tenant))
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Tenant does not exist"));
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("Failed to get tenant info data for non existing tenant 
{}", tenant);
-            throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
-        }
+        TenantInfo tenantInfo = 
pulsar.getPulsarResources().getTenatResources().get(path(POLICIES, tenant))
+                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant 
does not exist"));
 
         if (pulsar.getConfiguration().isAuthenticationEnabled() && 
pulsar.getConfiguration().isAuthorizationEnabled()) {
             if (!isClientAuthenticated(clientAppId)) {
@@ -308,7 +311,7 @@ public abstract class PulsarWebResource {
     protected void validateClusterForTenant(String tenant, String cluster) {
         TenantInfo tenantInfo;
         try {
-            tenantInfo = 
pulsar().getConfigurationCache().propertiesCache().get(path(POLICIES, tenant))
+            tenantInfo = 
pulsar().getPulsarResources().getTenatResources().get(path(POLICIES, tenant))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Tenant does not exist"));
         } catch (RestException e) {
             log.warn("Failed to get tenant admin data for tenant {}", tenant);
@@ -859,4 +862,134 @@ public abstract class PulsarWebResource {
             }
         }
     }
+
+    protected TenantResources tenantResources() {
+        return pulsar().getPulsarResources().getTenatResources();
+    }
+
+    protected ClusterResources clusterResources() {
+        return pulsar().getPulsarResources().getClusterResources();
+    }
+
+    protected NamespaceResources namespaceResources() {
+        return pulsar().getPulsarResources().getNamespaceResources();
+    }
+
+    protected IsolationPolicyResources namespaceIsolationPolicies(){
+        return namespaceResources().getIsolationPolicies();
+    }
+
+    public static ObjectMapper jsonMapper() {
+        return ObjectMapperFactory.getThreadLocal();
+    }
+
+    public void validatePoliciesReadOnlyAccess() {
+        try {
+            if 
(clusterResources().existsAsync(AdminResource.POLICIES_READONLY_FLAG_PATH).get())
 {
+                log.debug("Policies are read-only. Broker cannot do read-write 
operations");
+                throw new RestException(Status.FORBIDDEN, "Broker is forbidden 
to do read-write operations");
+            }
+        } catch (Exception e) {
+            log.warn("Unable to fetch read-only policy config {}", 
POLICIES_READONLY_FLAG_PATH, e);
+            throw new RestException(e);
+        }
+    }
+
+    protected CompletableFuture<Void> hasActiveNamespace(String tenant) {
+        CompletableFuture<Void> activeNamespaceFuture = new 
CompletableFuture<>();
+        tenantResources().getChildrenAsync(path(POLICIES, 
tenant)).thenAccept(clusterOrNamespaceList -> {
+            if (clusterOrNamespaceList == null || 
clusterOrNamespaceList.isEmpty()) {
+                activeNamespaceFuture.complete(null);
+                return;
+            }
+            List<CompletableFuture<Void>> activeNamespaceListFuture = 
Lists.newArrayList();
+            clusterOrNamespaceList.forEach(clusterOrNamespace -> {
+                // get list of active V1 namespace
+                CompletableFuture<Void> checkNs = new CompletableFuture<>();
+                activeNamespaceListFuture.add(checkNs);
+                tenantResources().getChildrenAsync(path(POLICIES, tenant, 
clusterOrNamespace))
+                        .whenComplete((children, ex) -> {
+                            if (ex != null) {
+                                checkNs.completeExceptionally(ex);
+                                return;
+                            }
+                            if (children != null && !children.isEmpty()) {
+                                checkNs.completeExceptionally(
+                                        new 
RestException(Status.PRECONDITION_FAILED, "Tenant has active namespace"));
+                                return;
+                            }
+                            String namespace = NamespaceName.get(tenant, 
clusterOrNamespace).toString();
+                            // if the length is 0 then this is probably a 
leftover cluster from namespace
+                            // created
+                            // with the v1 admin format (prop/cluster/ns) and 
then deleted, so no need to
+                            // add it to the list
+                            namespaceResources().getAsync(path(POLICIES, 
namespace)).thenApply(data -> {
+                                if (data.isPresent()) {
+                                    checkNs.completeExceptionally(new 
RestException(Status.PRECONDITION_FAILED,
+                                            "Tenant has active namespace"));
+                                } else {
+                                    checkNs.complete(null);
+                                }
+                                return null;
+                            }).exceptionally(ex2 -> {
+                                if (ex2.getCause() instanceof 
MetadataStoreException.ContentDeserializationException) {
+                                    // it's not a valid namespace-node
+                                    checkNs.complete(null);
+                                } else {
+                                    checkNs.completeExceptionally(
+                                            new 
RestException(Status.INTERNAL_SERVER_ERROR, ex2.getCause()));
+                                }
+                                return null;
+                            });
+                        });
+                FutureUtil.waitForAll(activeNamespaceListFuture).thenAccept(r 
-> {
+                    activeNamespaceFuture.complete(null);
+                }).exceptionally(ex -> {
+                    activeNamespaceFuture.completeExceptionally(ex.getCause());
+                    return null;
+                });
+            });
+        }).exceptionally(ex -> {
+            activeNamespaceFuture.completeExceptionally(ex.getCause());
+            return null;
+        });
+        return activeNamespaceFuture;
+    }
+
+    protected void validateClusterExists(String cluster) {
+        try {
+            if (!clusterResources().get(path("clusters", 
cluster)).isPresent()) {
+                throw new RestException(Status.PRECONDITION_FAILED, "Cluster " 
+ cluster + " does not exist.");
+            }
+        } catch (Exception e) {
+            throw new RestException(e);
+        }
+    }
+
+    protected CompletableFuture<Void> canUpdateCluster(String tenant, 
Set<String> oldClusters,
+            Set<String> newClusters) {
+        List<CompletableFuture<Void>> activeNamespaceFuture = 
Lists.newArrayList();
+        for (String cluster : oldClusters) {
+            if (Constants.GLOBAL_CLUSTER.equals(cluster) || 
newClusters.contains(cluster)) {
+                continue;
+            }
+            CompletableFuture<Void> checkNs = new CompletableFuture<>();
+            activeNamespaceFuture.add(checkNs);
+            tenantResources().getChildrenAsync(path(POLICIES, tenant, 
cluster)).whenComplete((activeNamespaces, ex) -> {
+                if (ex != null) {
+                    log.warn("Failed to get namespaces under {}-{}, {}", 
tenant, cluster, ex.getCause().getMessage());
+                    checkNs.completeExceptionally(ex.getCause());
+                    return;
+                }
+                if (activeNamespaces.size() > 0) {
+                    log.warn("{}/{} Active-namespaces {}", tenant, cluster, 
activeNamespaces);
+                    checkNs.completeExceptionally(new 
RestException(Status.PRECONDITION_FAILED, "Active namespaces"));
+                    return;
+                }
+                checkNs.complete(null);
+            });
+        }
+        return activeNamespaceFuture.isEmpty() ? 
CompletableFuture.completedFuture(null)
+                : FutureUtil.waitForAll(activeNamespaceFuture);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 8179afa..268a4ca 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -339,7 +339,6 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
             } catch (PulsarAdminException e) {
                 assertTrue(e instanceof NotFoundException);
             }
-
             // verify delete cluster failed
             try {
                 admin.clusters().deleteCluster("test");
@@ -626,6 +625,13 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(enabled = true)
     public void properties() throws PulsarAdminException {
+        try {
+            admin.tenants().getTenantInfo("does-not-exist");
+            fail("should have failed");
+        } catch (PulsarAdminException e) {
+            assertTrue(e instanceof NotFoundException);
+        }
+        
         Set<String> allowedClusters = Sets.newHashSet("test");
         TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", 
"role2"), allowedClusters);
         admin.tenants().updateTenant("prop-xyz", tenantInfo);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 047a174..f51eed1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -43,11 +43,15 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
@@ -82,12 +86,13 @@ import org.apache.pulsar.common.policies.data.ResourceQuota;
 import org.apache.pulsar.common.stats.AllocatorStats;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.mockito.ArgumentCaptor;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -129,18 +134,15 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
 
         clusters = spy(new Clusters());
         clusters.setPulsar(pulsar);
-        doReturn(mockZooKeeperGlobal).when(clusters).globalZk();
+        /*doReturn(mockZooKeeperGlobal).when(clusters).globalZk();
         
doReturn(configurationCache.clustersCache()).when(clusters).clustersCache();
         
doReturn(configurationCache.clustersListCache()).when(clusters).clustersListCache();
-        
doReturn(configurationCache.namespaceIsolationPoliciesCache()).when(clusters).namespaceIsolationPoliciesCache();
+        
doReturn(configurationCache.namespaceIsolationPoliciesCache()).when(clusters).namespaceIsolationPoliciesCache();*/
         doReturn("test").when(clusters).clientAppId();
         doNothing().when(clusters).validateSuperUserAccess();
 
         properties = spy(new Properties());
-        properties.setServletContext(new MockServletContext());
         properties.setPulsar(pulsar);
-        doReturn(mockZooKeeperGlobal).when(properties).globalZk();
-        
doReturn(configurationCache.propertiesCache()).when(properties).tenantsCache();
         doReturn("test").when(properties).clientAppId();
         doNothing().when(properties).validateSuperUserAccess();
 
@@ -239,7 +241,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         clusters.createCluster("use", new 
ClusterData("http://broker.messaging.use.example.com";));
         verify(clusters, times(1)).validateSuperUserAccess();
         // ensure to read from ZooKeeper directly
-        clusters.clustersListCache().clear();
+        //clusters.clustersListCache().clear();
         assertEquals(clusters.getClusters(), Lists.newArrayList("use"));
 
         // Check creating existing cluster
@@ -329,6 +331,14 @@ public class AdminTest extends MockedPulsarServiceBaseTest 
{
                     && path.equals("/admin/clusters");
             });
         configurationCache.clustersListCache().clear();
+        // clear caches to load data from metadata-store again
+        MetadataCacheImpl<ClusterData> clusterCache = 
(MetadataCacheImpl<ClusterData>) pulsar.getPulsarResources()
+                .getClusterResources().getCache();
+        MetadataCacheImpl isolationPolicyCache = (MetadataCacheImpl) 
pulsar.getPulsarResources()
+                .getNamespaceResources().getIsolationPolicies().getCache();
+        AbstractMetadataStore store = (AbstractMetadataStore) 
clusterCache.getStore();
+        clusterCache.invalidateAll();
+        store.invalidateAll();
         try {
             clusters.getClusters();
             fail("should have failed");
@@ -351,6 +361,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 return op == MockZooKeeper.Op.GET
                     && path.equals("/admin/clusters/test");
             });
+        clusterCache.invalidateAll();
+        store.invalidateAll();
         try {
             clusters.updateCluster("test", new 
ClusterData("http://broker.messaging.test.example.com";));
             fail("should have failed");
@@ -386,6 +398,9 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 return op == MockZooKeeper.Op.GET
                     && 
path.equals("/admin/clusters/use/namespaceIsolationPolicies");
             });
+        clusterCache.invalidateAll();
+        isolationPolicyCache.invalidateAll();
+        store.invalidateAll();
         try {
             clusters.deleteCluster("use");
             fail("should have failed");
@@ -402,9 +417,19 @@ public class AdminTest extends MockedPulsarServiceBaseTest 
{
         }
     }
 
+    Object asynRequests(Consumer<TestAsyncResponse> function) throws Exception 
{
+        TestAsyncResponse ctx = new TestAsyncResponse();
+        function.accept(ctx);
+        ctx.latch.await();
+        if (ctx.e != null) {
+            throw (Exception) ctx.e;
+        }
+        return ctx.response;
+    }
     @Test
-    public void properties() throws Exception {
-        assertEquals(properties.getTenants(), Lists.newArrayList());
+    public void properties() throws Throwable {
+        Object response = asynRequests(ctx -> properties.getTenants(ctx));
+        assertEquals(response, Lists.newArrayList());
         verify(properties, times(1)).validateSuperUserAccess();
 
         // create local cluster
@@ -413,29 +438,33 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
         Set<String> allowedClusters = Sets.newHashSet();
         allowedClusters.add(configClusterName);
         TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", 
"role2"), allowedClusters);
-        properties.createTenant("test-property", tenantInfo);
+        response = asynRequests(ctx -> properties.createTenant(ctx, 
"test-property", tenantInfo));
         verify(properties, times(2)).validateSuperUserAccess();
 
-        assertEquals(properties.getTenants(), 
Lists.newArrayList("test-property"));
+        response = asynRequests(ctx -> properties.getTenants(ctx));
+        assertEquals(response, Lists.newArrayList("test-property"));
         verify(properties, times(3)).validateSuperUserAccess();
 
-        assertEquals(properties.getTenantAdmin("test-property"), tenantInfo);
+        response = asynRequests(ctx -> properties.getTenantAdmin(ctx, 
"test-property"));
+        assertEquals(response, tenantInfo);
         verify(properties, times(4)).validateSuperUserAccess();
 
-        TenantInfo newPropertyAdmin = new TenantInfo(Sets.newHashSet("role1", 
"other-role"), allowedClusters);
-        properties.updateTenant("test-property", newPropertyAdmin);
+        final TenantInfo newPropertyAdmin = new 
TenantInfo(Sets.newHashSet("role1", "other-role"), allowedClusters);
+        response = asynRequests(ctx -> properties.updateTenant(ctx, 
"test-property", newPropertyAdmin));
         verify(properties, times(5)).validateSuperUserAccess();
 
         // Wait for updateTenant to take effect
         Thread.sleep(100);
 
-        assertEquals(properties.getTenantAdmin("test-property"), 
newPropertyAdmin);
-        assertNotSame(properties.getTenantAdmin("test-property"), tenantInfo);
+        response = asynRequests(ctx -> properties.getTenantAdmin(ctx, 
"test-property"));
+        assertEquals(response, newPropertyAdmin);
+        response = asynRequests(ctx -> properties.getTenantAdmin(ctx, 
"test-property"));
+        assertNotSame(response, tenantInfo);
         verify(properties, times(7)).validateSuperUserAccess();
 
         // Check creating existing property
         try {
-            properties.createTenant("test-property", tenantInfo);
+            response = asynRequests(ctx -> properties.createTenant(ctx, 
"test-property", tenantInfo));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.CONFLICT.getStatusCode());
@@ -443,14 +472,14 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
 
         // Check non-existing property
         try {
-            properties.getTenantAdmin("non-existing");
+            response = asynRequests(ctx -> properties.getTenantAdmin(ctx, 
"non-existing"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
         }
 
         try {
-            properties.updateTenant("xxx-non-existing", newPropertyAdmin);
+            response = asynRequests(ctx -> properties.updateTenant(ctx, 
"xxx-non-existing", newPropertyAdmin));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
@@ -458,93 +487,97 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
 
         // Check deleting non-existing property
         try {
-            properties.deleteTenant("non-existing");
+            response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"non-existing"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
         }
 
+        // clear caches to load data from metadata-store again
+        MetadataCacheImpl<TenantInfo> cache = (MetadataCacheImpl<TenantInfo>) 
pulsar.getPulsarResources()
+                .getTenatResources().getCache();
+        AbstractMetadataStore store = (AbstractMetadataStore) cache.getStore();
+        cache.invalidateAll();
+        store.invalidateAll();
         // Test zk failures
         mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> 
{
-                return op == MockZooKeeper.Op.GET_CHILDREN
-                    && path.equals("/admin/policies");
-            });
+            return op == MockZooKeeper.Op.GET_CHILDREN && 
path.equals("/admin/policies");
+        });
         try {
-            properties.getTenants();
+            response = asynRequests(ctx -> properties.getTenants(ctx));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
         mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> 
{
-                return op == MockZooKeeper.Op.GET
-                    && path.equals("/admin/policies/my-tenant");
-            });
+            return op == MockZooKeeper.Op.GET && 
path.equals("/admin/policies/my-tenant");
+        });
         try {
-            properties.getTenantAdmin("my-tenant");
+            response = asynRequests(ctx -> properties.getTenantAdmin(ctx, 
"my-tenant"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
         mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> 
{
-                return op == MockZooKeeper.Op.GET
-                    && path.equals("/admin/policies/my-tenant");
-            });
+            return op == MockZooKeeper.Op.GET && 
path.equals("/admin/policies/my-tenant");
+        });
         try {
-            properties.updateTenant("my-tenant", newPropertyAdmin);
+            response = asynRequests(ctx -> properties.updateTenant(ctx, 
"my-tenant", newPropertyAdmin));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
         mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> 
{
-                return op == MockZooKeeper.Op.CREATE
-                    && path.equals("/admin/policies/test");
-            });
+            return op == MockZooKeeper.Op.CREATE && 
path.equals("/admin/policies/test");
+        });
         try {
-            properties.createTenant("test", tenantInfo);
+            response = asynRequests(ctx -> properties.createTenant(ctx, 
"test", tenantInfo));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
         mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> 
{
-                return op == MockZooKeeper.Op.GET_CHILDREN
-                    && path.equals("/admin/policies/my-tenant");
-            });
+            return op == MockZooKeeper.Op.GET_CHILDREN && 
path.equals("/admin/policies/test-property");
+        });
         try {
-            properties.deleteTenant("my-tenant");
+            cache.invalidateAll();
+            store.invalidateAll();
+            response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"test-property"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        properties.createTenant("error-property", tenantInfo);
+        response = asynRequests(ctx -> properties.createTenant(ctx, 
"error-property", tenantInfo));
 
         mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> 
{
-                return op == MockZooKeeper.Op.DELETE
-                    && path.equals("/admin/policies/error-property");
-            });
+            return op == MockZooKeeper.Op.DELETE && 
path.equals("/admin/policies/error-property");
+        });
         try {
-            properties.deleteTenant("error-property");
+            response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"error-property"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        properties.deleteTenant("test-property");
-        properties.deleteTenant("error-property");
-        assertEquals(properties.getTenants(), Lists.newArrayList());
+        response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"test-property"));
+        response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"error-property"));
+        response = Lists.newArrayList();
+        response = asynRequests(ctx -> properties.getTenants(ctx));
+        assertEquals(response, Lists.newArrayList());
 
         // Create a namespace to test deleting a non-empty property
-        newPropertyAdmin = new TenantInfo(Sets.newHashSet("role1", 
"other-role"), Sets.newHashSet("use"));
-        properties.createTenant("my-tenant", newPropertyAdmin);
+        TenantInfo newPropertyAdmin2 = new TenantInfo(Sets.newHashSet("role1", 
"other-role"), Sets.newHashSet("use"));
+        response = asynRequests(ctx -> properties.createTenant(ctx, 
"my-tenant", newPropertyAdmin2));
 
         namespaces.createNamespace("my-tenant", "use", "my-namespace", new 
BundlesData());
 
         try {
-            properties.deleteTenant("my-tenant");
+            response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"my-tenant"));
             fail("should have failed");
         } catch (RestException e) {
             // Ok
@@ -552,7 +585,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
         // Check name validation
         try {
-            properties.createTenant("test&", tenantInfo);
+            response = asynRequests(ctx -> properties.createTenant(ctx, 
"test&", tenantInfo));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
@@ -560,7 +593,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
         // Check tenantInfo is null
         try {
-            properties.createTenant("tenant-config-is-null", null);
+            response = asynRequests(ctx -> properties.createTenant(ctx, 
"tenant-config-is-null", null));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
@@ -571,7 +604,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         Set<String> blankClusters = Sets.newHashSet(blankCluster);
         TenantInfo tenantWithEmptyCluster = new 
TenantInfo(Sets.newHashSet("role1", "role2"), blankClusters);
         try {
-            properties.createTenant("tenant-config-is-empty", 
tenantWithEmptyCluster);
+            response = asynRequests(ctx -> properties.createTenant(ctx, 
"tenant-config-is-empty", tenantWithEmptyCluster));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
@@ -582,18 +615,18 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
         containBlankClusters.add(configClusterName);
         TenantInfo tenantContainEmptyCluster = new 
TenantInfo(Sets.newHashSet(), containBlankClusters);
         try {
-            properties.createTenant("tenant-config-contain-empty", 
tenantContainEmptyCluster);
+            response = asynRequests(ctx -> properties.createTenant(ctx, 
"tenant-config-contain-empty", tenantContainEmptyCluster));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
         }
 
-        AsyncResponse response = mock(AsyncResponse.class);
-        namespaces.deleteNamespace(response, "my-tenant", "use", 
"my-namespace", false, false);
+        AsyncResponse response2 = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response2, "my-tenant", "use", 
"my-namespace", false, false);
         ArgumentCaptor<Response> captor = 
ArgumentCaptor.forClass(Response.class);
-        verify(response, timeout(5000).times(1)).resume(captor.capture());
+        verify(response2, timeout(5000).times(1)).resume(captor.capture());
         assertEquals(captor.getValue().getStatus(), 
Status.NO_CONTENT.getStatusCode());
-        properties.deleteTenant("my-tenant");
+        response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"my-tenant"));
     }
 
     @Test
@@ -654,9 +687,9 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         // create policies
         TenantInfo admin = new TenantInfo();
         admin.getAllowedClusters().add(cluster);
-        mockZooKeeperGlobal.create(PulsarWebResource.path(POLICIES, property),
-                ObjectMapperFactory.getThreadLocal().writeValueAsBytes(admin), 
Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+        ClusterData clusterData = new ClusterData(cluster);
+        clusters.createCluster(cluster, clusterData );
+        asynRequests(ctx -> properties.createTenant(ctx, property, admin));
 
         // customized bandwidth for this namespace
         double customizeBandwidth = 3000;
@@ -762,4 +795,85 @@ public class AdminTest extends MockedPulsarServiceBaseTest 
{
 
     }
 
+    static class TestAsyncResponse implements AsyncResponse {
+
+        Object response;
+        Throwable e;
+        CountDownLatch latch = new CountDownLatch(1);
+
+        @Override
+        public boolean resume(Object response) {
+            this.response = response;
+            latch.countDown();
+            return true;
+        }
+
+        @Override
+        public boolean resume(Throwable response) {
+            this.e = response;
+            latch.countDown();
+            return true;
+        }
+
+        @Override
+        public boolean cancel() {
+            return false;
+        }
+
+        @Override
+        public boolean cancel(int retryAfter) {
+            return false;
+        }
+
+        @Override
+        public boolean cancel(Date retryAfter) {
+            return false;
+        }
+
+        @Override
+        public boolean isSuspended() {
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return false;
+        }
+
+        @Override
+        public boolean setTimeout(long time, TimeUnit unit) {
+            return false;
+        }
+
+        @Override
+        public void setTimeoutHandler(TimeoutHandler handler) {
+
+        }
+
+        @Override
+        public Collection<Class<?>> register(Class<?> callback) {
+            return null;
+        }
+
+        @Override
+        public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback, 
Class<?>... callbacks) {
+            return null;
+        }
+
+        @Override
+        public Collection<Class<?>> register(Object callback) {
+            return null;
+        }
+
+        @Override
+        public Map<Class<?>, Collection<Class<?>>> register(Object callback, 
Object... callbacks) {
+            return null;
+        }
+
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 4ad5145..6f68f71 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -276,6 +276,7 @@ public abstract class MockedPulsarServiceBaseTest {
         
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
         
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
+        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(pulsar).createConfigurationMetadataStore();
 
         Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new 
NamespaceService(pulsar));
         
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
index 1b0c7f3..ec9fde5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -129,7 +129,7 @@ public class OwnerShipForCurrentServerTestBase {
         
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
         
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
-
+        doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore();
         Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new 
NamespaceService(pulsar));
         
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 5688f97..12dcac5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -57,12 +57,12 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -785,7 +785,11 @@ public class BrokerServiceTest extends BrokerTestBase {
     @Test
     public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {
         final String namespace = "prop/disableBundle";
-        admin.namespaces().createNamespace(namespace);
+        try {
+            admin.namespaces().createNamespace(namespace);
+        } catch (PulsarAdminException.ConflictException e) {
+            // Ok.. (if test fails intermittently and namespace is already 
created)
+        }
         admin.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("test"));
 
         // own namespace bundle
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 6a2cfaa..6a5e254 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -133,7 +133,7 @@ public class TransactionTestBase {
         
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
         
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
-
+        doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore();
         Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new 
NamespaceService(pulsar));
         
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 04f9d81..b9eb85e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -44,6 +44,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.KeyManager;
@@ -57,6 +58,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.MockedBookKeeperClientFactory;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -67,9 +69,12 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.zookeeper.MockedZooKeeperClientFactoryImpl;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
 import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.BoundRequestBuilder;
 import org.asynchttpclient.DefaultAsyncHttpClient;
@@ -323,8 +328,6 @@ public class WebServiceTest {
         }
     }
 
-    MockedZooKeeperClientFactoryImpl zkFactory = new 
MockedZooKeeperClientFactoryImpl();
-
     private void setupEnv(boolean enableFilter, String minApiVersion, boolean 
allowUnversionedClients,
             boolean enableTls, boolean enableAuth, boolean allowInsecure, 
double rateLimit) throws Exception {
         if (pulsar != null) {
@@ -363,8 +366,20 @@ public class WebServiceTest {
         }
 
         pulsar = spy(new PulsarService(config));
-        doReturn(zkFactory).when(pulsar).getZooKeeperClientFactory();
-        doReturn(new 
ZKMetadataStore(MockZooKeeper.newInstance())).when(pulsar).createLocalMetadataStore();
+     // mock zk
+        MockZooKeeper mockZooKeeper = 
MockedPulsarServiceBaseTest.createMockZooKeeper();
+        ZooKeeperClientFactory mockZooKeeperClientFactory = new 
ZooKeeperClientFactory() {
+
+             @Override
+             public CompletableFuture<ZooKeeper> create(String serverList, 
SessionType sessionType,
+                     int zkSessionTimeoutMillis) {
+                 // Always return the same instance (so that we don't loose 
the mock ZK content on broker restart
+                 return CompletableFuture.completedFuture(mockZooKeeper);
+             }
+         };
+        
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
+        doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore();
+        doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
         doReturn(new 
MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory();
         pulsar.start();
 
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
index fbd5f77..d9ffda0 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
@@ -35,6 +35,10 @@ public class MetadataStoreException extends IOException {
         super(msg);
     }
 
+    public MetadataStoreException(String msg, Throwable t) {
+        super(msg, t);
+    }
+
     /**
      * Implementation is invalid
      */
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index 6706108..8ad90df 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.JavaType;
 import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-
+import com.google.common.annotations.VisibleForTesting;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.List;
 import java.util.Map;
@@ -34,7 +34,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
-
+import lombok.Getter;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -49,6 +49,7 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
 
     private static final long CACHE_REFRESH_TIME_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
 
+    @Getter
     private final MetadataStore store;
     private final MetadataSerde<T> serde;
 
@@ -227,6 +228,11 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
         objCache.synchronous().invalidate(path);
     }
 
+    @VisibleForTesting
+    public void invalidateAll() {
+        objCache.synchronous().invalidateAll();
+    }
+
     @Override
     public void accept(Notification t) {
         String path = t.getPath();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 42293c7..a7ff703 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 
@@ -211,6 +212,12 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         executor.awaitTermination(10, TimeUnit.SECONDS);
     }
 
+    @VisibleForTesting
+    public void invalidateAll() {
+        childrenCache.synchronous().invalidateAll();
+        existsCache.synchronous().invalidateAll();
+    }
+
     protected static String parent(String path) {
         int idx = path.lastIndexOf('/');
         if (idx <= 0) {
diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/MockedZooKeeperClientFactoryImpl.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/MockedZooKeeperClientFactoryImpl.java
index 7a051ab..352db0b 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/MockedZooKeeperClientFactoryImpl.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/MockedZooKeeperClientFactoryImpl.java
@@ -33,7 +33,7 @@ import org.apache.zookeeper.data.ACL;
 
 public class MockedZooKeeperClientFactoryImpl implements 
ZooKeeperClientFactory {
 
-    Queue<MockZooKeeper> createdInstances = new ConcurrentLinkedQueue<>();
+    public Queue<MockZooKeeper> createdInstances = new 
ConcurrentLinkedQueue<>();
 
     @Override
     public CompletableFuture<ZooKeeper> create(String serverList, SessionType 
sessionType, int zkSessionTimeoutMillis) {

Reply via email to