This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 0edcaa0 [pulsar-broker] cluster resources use metadata-store api (#9338) 0edcaa0 is described below commit 0edcaa09150521a2a7e189de43d004ed799db2ee Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Jan 28 12:20:10 2021 -0800 [pulsar-broker] cluster resources use metadata-store api (#9338) * [pulsar-broker] Make tenant rest-api async and use metadata-store api fix tests fix intermittent test failure * [pulsar-broker] cluster resources use metadata-store api * fix test --- .../org/apache/pulsar/broker/PulsarService.java | 18 +- .../apache/pulsar/broker/admin/AdminResource.java | 6 +- .../pulsar/broker/admin/impl/BaseResources.java | 152 ++++++++++ .../pulsar/broker/admin/impl/ClusterResources.java | 51 ++++ .../pulsar/broker/admin/impl/ClustersBase.java | 204 +++++--------- .../broker/admin/impl/NamespaceResources.java | 51 ++++ .../pulsar/broker/admin/impl/PulsarResources.java | 37 +++ .../pulsar/broker/admin/impl/TenantResources.java | 28 ++ .../pulsar/broker/admin/impl/TenantsBase.java | 307 ++++++++++++--------- .../pulsar/broker/web/PulsarWebResource.java | 157 ++++++++++- .../apache/pulsar/broker/admin/AdminApiTest.java | 8 +- .../org/apache/pulsar/broker/admin/AdminTest.java | 238 +++++++++++----- .../broker/auth/MockedPulsarServiceBaseTest.java | 1 + .../OwnerShipForCurrentServerTestBase.java | 2 +- .../pulsar/broker/service/BrokerServiceTest.java | 8 +- .../broker/transaction/TransactionTestBase.java | 2 +- .../apache/pulsar/broker/web/WebServiceTest.java | 23 +- .../metadata/api/MetadataStoreException.java | 4 + .../metadata/cache/impl/MetadataCacheImpl.java | 10 +- .../metadata/impl/AbstractMetadataStore.java | 7 + .../MockedZooKeeperClientFactoryImpl.java | 2 +- 21 files changed, 958 insertions(+), 358 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index a017d28..3e03297 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -69,6 +69,7 @@ import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.ZookeeperSessionExpiredHandlers; import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.admin.impl.PulsarResources; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; @@ -220,6 +221,8 @@ public class PulsarService implements AutoCloseable { private MetadataStoreExtended localMetadataStore; private CoordinationService coordinationService; + private MetadataStoreExtended configurationMetadataStore; + private PulsarResources pulsarResources; public enum State { Init, Started, Closed @@ -280,6 +283,14 @@ public class PulsarService implements AutoCloseable { new DefaultThreadFactory("zk-cache-callback")); } + public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException { + return MetadataStoreExtended.create(config.getConfigurationStoreServers(), + MetadataStoreConfig.builder() + .sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis()) + .allowReadOnlyOperations(false) + .build()); + } + /** * Close the current pulsar service. All resources are released. */ @@ -396,6 +407,9 @@ public class PulsarService implements AutoCloseable { if (localMetadataStore != null) { localMetadataStore.close(); } + if (configurationMetadataStore != null) { + configurationMetadataStore.close(); + } state = State.Closed; isClosedCondition.signalAll(); @@ -467,9 +481,11 @@ public class PulsarService implements AutoCloseable { } localMetadataStore = createLocalMetadataStore(); - coordinationService = new CoordinationServiceImpl(localMetadataStore); + configurationMetadataStore = createConfigurationMetadataStore(); + pulsarResources = new PulsarResources(configurationMetadataStore); + orderedExecutor = OrderedExecutor.newBuilder() .numThreads(config.getNumOrderedExecutorThreads()) .name("pulsar-ordered") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 1b98ba1..1728c88 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -89,7 +89,7 @@ import org.slf4j.LoggerFactory; public abstract class AdminResource extends PulsarWebResource { private static final Logger log = LoggerFactory.getLogger(AdminResource.class); - private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly"; + public static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly"; public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics"; private static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers"; @@ -169,7 +169,7 @@ public abstract class AdminResource extends PulsarWebResource { // This is a stub method for Mockito @Override - protected void validateSuperUserAccess() { + public void validateSuperUserAccess() { super.validateSuperUserAccess(); } @@ -740,7 +740,7 @@ public abstract class AdminResource extends PulsarWebResource { protected void validateClusterExists(String cluster) { try { - if (!clustersCache().get(path("clusters", cluster)).isPresent()) { + if (!clusterResources().get(path("clusters", cluster)).isPresent()) { throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist."); } } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java new file mode 100644 index 0000000..07cd9c4 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.impl; + +import com.fasterxml.jackson.core.type.TypeReference; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import lombok.Getter; +import org.apache.pulsar.metadata.api.MetadataCache; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; + +/** + * Base class for all configuration resources to access configurations from metadata-store. + * + * @param <T> + * type of configuration-resources. + */ +public class BaseResources<T> { + + @Getter + private final MetadataStoreExtended store; + @Getter + private final MetadataCache<T> cache; + + public BaseResources(MetadataStoreExtended store, Class<T> clazz) { + this.store = store; + this.cache = store.getMetadataCache(clazz); + } + + public BaseResources(MetadataStoreExtended store, TypeReference<T> typeRef) { + this.store = store; + this.cache = store.getMetadataCache(typeRef); + } + + public List<String> getChildren(String path) throws MetadataStoreException { + try { + return getChildrenAsync(path).get(); + } catch (ExecutionException e) { + throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause() + : new MetadataStoreException(e.getCause()); + } catch (Exception e) { + throw new MetadataStoreException("Failed to get childeren of " + path, e); + } + } + + public CompletableFuture<List<String>> getChildrenAsync(String path) { + return cache.getChildren(path); + } + + public Optional<T> get(String path) throws MetadataStoreException { + try { + return getAsync(path).get(); + } catch (ExecutionException e) { + throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause() + : new MetadataStoreException(e.getCause()); + } catch (Exception e) { + throw new MetadataStoreException("Failed to get data from " + path, e); + } + } + + public CompletableFuture<Optional<T>> getAsync(String path) { + return cache.get(path); + } + + public void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException { + try { + setAsync(path, modifyFunction).get(); + } catch (ExecutionException e) { + throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause() + : new MetadataStoreException(e.getCause()); + } catch (Exception e) { + throw new MetadataStoreException("Failed to set data for " + path, e); + } + } + + public CompletableFuture<Void> setAsync(String path, Function<T, T> modifyFunction) { + return cache.readModifyUpdate(path, modifyFunction); + } + + public void create(String path, T data) throws MetadataStoreException { + create(path, t -> data); + } + + public void create(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException { + try { + createAsync(path, createFunction).get(); + } catch (ExecutionException e) { + throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause() + : new MetadataStoreException(e.getCause()); + } catch (Exception e) { + throw new MetadataStoreException("Failed to create " + path, e); + } + } + + public CompletableFuture<Void> createAsync(String path, T data) { + return createAsync(path, t -> data); + } + + public CompletableFuture<Void> createAsync(String path, Function<Optional<T>, T> createFunction) { + return cache.readModifyUpdateOrCreate(path, createFunction); + } + + public void delete(String path) throws MetadataStoreException { + try { + deleteAsync(path).get(); + } catch (ExecutionException e) { + throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause() + : new MetadataStoreException(e.getCause()); + } catch (Exception e) { + throw new MetadataStoreException("Failed to delete " + path, e); + } + } + + public CompletableFuture<Void> deleteAsync(String path) { + return cache.delete(path); + } + + public boolean exists(String path) throws MetadataStoreException { + try { + return existsAsync(path).get(); + } catch (ExecutionException e) { + throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause() + : new MetadataStoreException(e.getCause()); + } catch (Exception e) { + throw new MetadataStoreException("Failed to check exist " + path, e); + } + } + + public CompletableFuture<Boolean> existsAsync(String path) { + return cache.exists(path); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java new file mode 100644 index 0000000..d580f2e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.impl; + +import java.util.HashSet; +import java.util.Set; +import lombok.Getter; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.FailureDomain; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; + +public class ClusterResources extends BaseResources<ClusterData> { + + public static final String CLUSTERS_ROOT = "/admin/clusters"; + @Getter + private FailureDomainResources failureDomainResources; + + public ClusterResources(MetadataStoreExtended store) { + super(store, ClusterData.class); + this.failureDomainResources = new FailureDomainResources(store, FailureDomain.class); + } + + public Set<String> list() throws MetadataStoreException { + return new HashSet<>(super.getChildren(CLUSTERS_ROOT)); + } + + public static class FailureDomainResources extends BaseResources<FailureDomain> { + public static final String FAILURE_DOMAIN = "failureDomain"; + + public FailureDomainResources(MetadataStoreExtended store, Class<FailureDomain> clazz) { + super(store, clazz); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index d2714ec..b483114 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -20,8 +20,6 @@ package org.apache.pulsar.broker.admin.impl; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.namespace.NamespaceService.NAMESPACE_ISOLATION_POLICIES; -import com.fasterxml.jackson.core.JsonGenerationException; -import com.fasterxml.jackson.databind.JsonMappingException; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.swagger.annotations.ApiOperation; @@ -30,7 +28,6 @@ import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import io.swagger.annotations.Example; import io.swagger.annotations.ExampleProperty; -import java.io.IOException; import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; @@ -51,9 +48,10 @@ import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; -import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.admin.impl.ClusterResources.FailureDomainResources; import org.apache.pulsar.broker.cache.ConfigurationCacheService; +import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.common.naming.Constants; @@ -66,14 +64,11 @@ import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; +import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ClustersBase extends AdminResource { +public class ClustersBase extends PulsarWebResource { @GET @ApiOperation( @@ -87,7 +82,7 @@ public class ClustersBase extends AdminResource { public Set<String> getClusters() throws Exception { try { // Remove "global" cluster from returned list - Set<String> clusters = clustersListCache().get().stream() + Set<String> clusters = clusterResources().list().stream() .filter(cluster -> !Constants.GLOBAL_CLUSTER.equals(cluster)).collect(Collectors.toSet()); return clusters; } catch (Exception e) { @@ -119,7 +114,7 @@ public class ClustersBase extends AdminResource { validateSuperUserAccess(); try { - return clustersCache().get(path("clusters", cluster)) + return clusterResources().get(path("clusters", cluster)) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist")); } catch (Exception e) { log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e); @@ -171,11 +166,12 @@ public class ClustersBase extends AdminResource { try { NamedEntity.checkName(cluster); - zkCreate(path("clusters", cluster), jsonMapper().writeValueAsBytes(clusterData)); + if (clusterResources().get(path("clusters", cluster)).isPresent()) { + log.warn("[{}] Failed to create already existing cluster {}", clientAppId(), cluster); + throw new RestException(Status.CONFLICT, "Cluster already exists"); + } + clusterResources().create(path("clusters", cluster), clusterData); log.info("[{}] Created cluster {}", clientAppId(), cluster); - } catch (KeeperException.NodeExistsException e) { - log.warn("[{}] Failed to create already existing cluster {}", clientAppId(), cluster); - throw new RestException(Status.CONFLICT, "Cluster already exists"); } catch (IllegalArgumentException e) { log.warn("[{}] Failed to create cluster with invalid name {}", clientAppId(), cluster, e); throw new RestException(Status.PRECONDITION_FAILED, "Cluster name is not valid"); @@ -222,23 +218,12 @@ public class ClustersBase extends AdminResource { validatePoliciesReadOnlyAccess(); try { - String clusterPath = path("clusters", cluster); - Stat nodeStat = new Stat(); - byte[] content = globalZk().getData(clusterPath, null, nodeStat); - ClusterData currentClusterData = null; - if (content.length > 0) { - currentClusterData = jsonMapper().readValue(content, ClusterData.class); - // only update cluster-url-data and not overwrite other metadata such as peerClusterNames - currentClusterData.update(clusterData); - } else { - currentClusterData = clusterData; - } - // Write back the new updated ClusterData into zookeeper - globalZk().setData(clusterPath, jsonMapper().writeValueAsBytes(currentClusterData), - nodeStat.getVersion()); - globalZkCache().invalidate(clusterPath); + clusterResources().set(path("clusters", cluster), old -> { + old.update(clusterData); + return old; + }); log.info("[{}] Updated cluster {}", clientAppId(), cluster); - } catch (KeeperException.NoNodeException e) { + } catch (NotFoundException e) { log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), cluster); throw new RestException(Status.NOT_FOUND, "Cluster does not exist"); } catch (Exception e) { @@ -292,7 +277,7 @@ public class ClustersBase extends AdminResource { throw new RestException(Status.PRECONDITION_FAILED, cluster + " itself can't be part of peer-list"); } - clustersCache().get(path("clusters", peerCluster)) + clusterResources().get(path("clusters", peerCluster)) .orElseThrow(() -> new RestException(Status.PRECONDITION_FAILED, "Peer cluster " + peerCluster + " does not exist")); } catch (RestException e) { @@ -308,16 +293,12 @@ public class ClustersBase extends AdminResource { } try { - String clusterPath = path("clusters", cluster); - Stat nodeStat = new Stat(); - byte[] content = globalZk().getData(clusterPath, null, nodeStat); - ClusterData currentClusterData = jsonMapper().readValue(content, ClusterData.class); - currentClusterData.setPeerClusterNames(peerClusterNames); - // Write back the new updated ClusterData into zookeeper - globalZk().setData(clusterPath, jsonMapper().writeValueAsBytes(currentClusterData), nodeStat.getVersion()); - globalZkCache().invalidate(clusterPath); + clusterResources().set(path("clusters", cluster), old -> { + old.setPeerClusterNames(peerClusterNames); + return old; + }); log.info("[{}] Successfully added peer-cluster {} for {}", clientAppId(), peerClusterNames, cluster); - } catch (KeeperException.NoNodeException e) { + } catch (NotFoundException e) { log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), cluster); throw new RestException(Status.NOT_FOUND, "Cluster does not exist"); } catch (Exception e) { @@ -347,15 +328,10 @@ public class ClustersBase extends AdminResource { @PathParam("cluster") String cluster ) { validateSuperUserAccess(); - try { - String clusterPath = path("clusters", cluster); - byte[] content = globalZk().getData(clusterPath, null, null); - ClusterData clusterData = jsonMapper().readValue(content, ClusterData.class); + ClusterData clusterData = clusterResources().get(path("clusters", cluster)) + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist")); return clusterData.getPeerClusterNames(); - } catch (KeeperException.NoNodeException e) { - log.warn("[{}] Failed to get cluster {}: Does not exist", clientAppId(), cluster); - throw new RestException(Status.NOT_FOUND, "Cluster does not exist"); } catch (Exception e) { log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e); throw new RestException(e); @@ -388,12 +364,12 @@ public class ClustersBase extends AdminResource { // Check that the cluster is not used by any property (eg: no namespaces provisioned there) boolean isClusterUsed = false; try { - for (String property : globalZk().getChildren(path(POLICIES), false)) { - if (globalZk().exists(path(POLICIES, property, cluster), false) == null) { + for (String property : tenantResources().getChildren(path(POLICIES))) { + if (!clusterResources().exists(path(POLICIES, property, cluster))) { continue; } - if (!globalZk().getChildren(path(POLICIES, property, cluster), false).isEmpty()) { + if (!clusterResources().getChildren(path(POLICIES, property, cluster)).isEmpty()) { // We found a property that has at least a namespace in this cluster isClusterUsed = true; break; @@ -402,13 +378,12 @@ public class ClustersBase extends AdminResource { // check the namespaceIsolationPolicies associated with the cluster String path = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); - Optional<NamespaceIsolationPolicies> nsIsolationPolicies = namespaceIsolationPoliciesCache().get(path); + Optional<NamespaceIsolationPolicies> nsIsolationPolicies = namespaceIsolationPolicies().getPolicies(path); // Need to delete the isolation policies if present if (nsIsolationPolicies.isPresent()) { if (nsIsolationPolicies.get().getPolicies().isEmpty()) { - globalZk().delete(path, -1); - namespaceIsolationPoliciesCache().invalidate(path); + namespaceIsolationPolicies().delete(path); } else { isClusterUsed = true; } @@ -426,10 +401,9 @@ public class ClustersBase extends AdminResource { try { String clusterPath = path("clusters", cluster); deleteFailureDomain(clusterPath); - globalZk().delete(clusterPath, -1); - globalZkCache().invalidate(clusterPath); + clusterResources().delete(clusterPath); log.info("[{}] Deleted cluster {}", clientAppId(), cluster); - } catch (KeeperException.NoNodeException e) { + } catch (NotFoundException e) { log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster); throw new RestException(Status.NOT_FOUND, "Cluster does not exist"); } catch (Exception e) { @@ -441,16 +415,14 @@ public class ClustersBase extends AdminResource { private void deleteFailureDomain(String clusterPath) { try { String failureDomain = joinPath(clusterPath, ConfigurationCacheService.FAILURE_DOMAIN); - if (globalZk().exists(failureDomain, false) == null) { + if (!clusterResources().exists(failureDomain)) { return; } - for (String domain : globalZk().getChildren(failureDomain, false)) { + for (String domain : clusterResources().getChildren(failureDomain)) { String domainPath = joinPath(failureDomain, domain); - globalZk().delete(domainPath, -1); + clusterResources().delete(domainPath); } - globalZk().delete(failureDomain, -1); - failureDomainCache().clear(); - failureDomainListCache().clear(); + clusterResources().delete(failureDomain); } catch (Exception e) { log.warn("Failed to delete failure-domain under cluster {}", clusterPath); throw new RestException(e); @@ -478,13 +450,13 @@ public class ClustersBase extends AdminResource { @PathParam("cluster") String cluster ) throws Exception { validateSuperUserAccess(); - if (!clustersCache().get(path("clusters", cluster)).isPresent()) { + if (!clusterResources().exists(path("clusters", cluster))) { throw new RestException(Status.NOT_FOUND, "Cluster " + cluster + " does not exist."); } try { - NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() - .get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES)) + NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies() + .getPolicies(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES)) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + cluster + " does not exist")); // construct the response to Namespace isolation data map @@ -524,8 +496,8 @@ public class ClustersBase extends AdminResource { validateClusterExists(cluster); try { - NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() - .get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES)) + NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies() + .getPolicies(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES)) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + cluster + " does not exist")); // construct the response to Namespace isolation data map @@ -577,8 +549,8 @@ public class ClustersBase extends AdminResource { throw new RestException(e); } try { - Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPoliciesCache() - .get(nsIsolationPoliciesPath); + Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPolicies() + .getPolicies(nsIsolationPoliciesPath); if (!nsPoliciesResult.isPresent()) { throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster); } @@ -639,8 +611,8 @@ public class ClustersBase extends AdminResource { final String nsIsolationPoliciesPath = AdminResource.path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); Map<String, NamespaceIsolationData> nsPolicies; try { - Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPoliciesCache() - .get(nsIsolationPoliciesPath); + Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPolicies() + .getPolicies(nsIsolationPoliciesPath); if (!nsPoliciesResult.isPresent()) { throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster); } @@ -710,21 +682,18 @@ public class ClustersBase extends AdminResource { jsonInput = ObjectMapperFactory.create().writeValueAsString(policyData); String nsIsolationPolicyPath = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); - NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() - .get(nsIsolationPolicyPath).orElseGet(() -> { + NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies() + .getPolicies(nsIsolationPolicyPath).orElseGet(() -> { try { - this.createZnodeIfNotExist(nsIsolationPolicyPath, Optional.of(Collections.emptyMap())); + namespaceIsolationPolicies().create(nsIsolationPolicyPath, Collections.emptyMap()); return new NamespaceIsolationPolicies(); - } catch (KeeperException | InterruptedException e) { + } catch (Exception e) { throw new RestException(e); } }); nsIsolationPolicies.setPolicy(policyName, policyData); - globalZk().setData(nsIsolationPolicyPath, jsonMapper().writeValueAsBytes(nsIsolationPolicies.getPolicies()), - -1); - // make sure that the cache content will be refreshed for the next read access - namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath); + namespaceIsolationPolicies().set(nsIsolationPolicyPath, old -> nsIsolationPolicies.getPolicies()); // whether or not make the isolation update on time. if (pulsar().getConfiguration().isEnableNamespaceIsolationUpdateOnTime()) { @@ -738,7 +707,7 @@ public class ClustersBase extends AdminResource { clientAppId(), cluster, policyName, iae); asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Invalid format of input policy data. policy: " + policyName + "; data: " + jsonInput)); - } catch (KeeperException.NoNodeException nne) { + } catch (NotFoundException nne) { log.warn("[{}] Failed to update clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(), cluster); asyncResponse.resume(new RestException(Status.NOT_FOUND, @@ -832,29 +801,6 @@ public class ClustersBase extends AdminResource { }); } - private boolean createZnodeIfNotExist(String path, Optional<Object> value) - throws KeeperException, InterruptedException { - // create persistent node on ZooKeeper - if (globalZk().exists(path, false) == null) { - // create all the intermediate nodes - try { - ZkUtils.createFullPathOptimistic(globalZk(), path, - value.isPresent() ? jsonMapper().writeValueAsBytes(value.get()) : null, Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - return true; - } catch (KeeperException.NodeExistsException nee) { - if (log.isDebugEnabled()) { - log.debug("Other broker preempted the full path [{}] already. Continue...", path); - } - } catch (JsonGenerationException e) { - // ignore json error as it is empty hash - } catch (JsonMappingException e) { - } catch (IOException e) { - } - } - return false; - } - @DELETE @Path("/{cluster}/namespaceIsolationPolicies/{policyName}") @ApiOperation( @@ -886,22 +832,19 @@ public class ClustersBase extends AdminResource { try { String nsIsolationPolicyPath = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); - NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() - .get(nsIsolationPolicyPath).orElseGet(() -> { + NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies() + .getPolicies(nsIsolationPolicyPath).orElseGet(() -> { try { - this.createZnodeIfNotExist(nsIsolationPolicyPath, Optional.of(Collections.emptyMap())); + namespaceIsolationPolicies().create(nsIsolationPolicyPath, Collections.emptyMap()); return new NamespaceIsolationPolicies(); - } catch (KeeperException | InterruptedException e) { + } catch (Exception e) { throw new RestException(e); } }); nsIsolationPolicies.deletePolicy(policyName); - globalZk().setData(nsIsolationPolicyPath, jsonMapper().writeValueAsBytes(nsIsolationPolicies.getPolicies()), - -1); - // make sure that the cache content will be refreshed for the next read access - namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath); - } catch (KeeperException.NoNodeException nne) { + namespaceIsolationPolicies().set(nsIsolationPolicyPath, old -> nsIsolationPolicies.getPolicies()); + } catch (NotFoundException nne) { log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(), cluster); throw new RestException(Status.NOT_FOUND, @@ -949,15 +892,9 @@ public class ClustersBase extends AdminResource { try { String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName); - if (this.createZnodeIfNotExist(domainPath, Optional.ofNullable(domain))) { - // clear domains-children cache - this.failureDomainListCache().clear(); - } else { - globalZk().setData(domainPath, jsonMapper().writeValueAsBytes(domain), -1); - // make sure that the domain-cache will be refreshed for the next read access - failureDomainCache().invalidate(domainPath); - } - } catch (KeeperException.NoNodeException nne) { + FailureDomainResources failureDomainListCache = clusterResources().getFailureDomainResources(); + failureDomainListCache.create(domainPath, old -> domain); + } catch (NotFoundException nne) { log.warn("[{}] Failed to update domain {}. clusters {} Does not exist", clientAppId(), cluster, domainName); throw new RestException(Status.NOT_FOUND, @@ -992,16 +929,17 @@ public class ClustersBase extends AdminResource { Map<String, FailureDomain> domains = Maps.newHashMap(); try { final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT; - for (String domainName : failureDomainListCache().get()) { + FailureDomainResources failureDomainListCache = clusterResources().getFailureDomainResources(); + for (String domainName : failureDomainListCache.getChildren(failureDomainRootPath)) { try { - Optional<FailureDomain> domain = failureDomainCache() + Optional<FailureDomain> domain = failureDomainListCache .get(joinPath(failureDomainRootPath, domainName)); domain.ifPresent(failureDomain -> domains.put(domainName, failureDomain)); } catch (Exception e) { log.warn("Failed to get domain {}", domainName, e); } } - } catch (KeeperException.NoNodeException e) { + } catch (NotFoundException e) { log.warn("[{}] Failure-domain is not configured for cluster {}", clientAppId(), cluster, e); return Collections.emptyMap(); } catch (Exception e) { @@ -1041,7 +979,7 @@ public class ClustersBase extends AdminResource { try { final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT; - return failureDomainCache().get(joinPath(failureDomainRootPath, domainName)) + return clusterResources().getFailureDomainResources().get(joinPath(failureDomainRootPath, domainName)) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Domain " + domainName + " for cluster " + cluster + " does not exist")); } catch (RestException re) { @@ -1082,11 +1020,8 @@ public class ClustersBase extends AdminResource { try { final String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName); - globalZk().delete(domainPath, -1); - // clear domain cache - failureDomainCache().invalidate(domainPath); - failureDomainListCache().clear(); - } catch (KeeperException.NoNodeException nne) { + clusterResources().getFailureDomainResources().delete(domainPath); + } catch (NotFoundException nne) { log.warn("[{}] Domain {} does not exist in {}", clientAppId(), domainName, cluster); throw new RestException(Status.NOT_FOUND, "Domain-name " + domainName + " or cluster " + cluster + " does not exist"); @@ -1101,13 +1036,14 @@ public class ClustersBase extends AdminResource { if (inputDomain != null && inputDomain.brokers != null) { try { final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT; - for (String domainName : failureDomainListCache().get()) { + for (String domainName : clusterResources().getFailureDomainResources() + .getChildren(failureDomainRootPath)) { if (inputDomainName.equals(domainName)) { continue; } try { Optional<FailureDomain> domain = - failureDomainCache() + clusterResources().getFailureDomainResources() .get(joinPath(failureDomainRootPath, domainName)); if (domain.isPresent() && domain.get().brokers != null) { List<String> duplicateBrokers = domain.get().brokers.stream().parallel() @@ -1124,7 +1060,7 @@ public class ClustersBase extends AdminResource { log.warn("Failed to get domain {}", domainName, e); } } - } catch (KeeperException.NoNodeException e) { + } catch (NotFoundException e) { if (log.isDebugEnabled()) { log.debug("[{}] Domain is not configured for cluster", clientAppId(), e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java new file mode 100644 index 0000000..966d421 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.impl; + +import com.fasterxml.jackson.core.type.TypeReference; +import java.util.Map; +import java.util.Optional; +import lombok.Getter; +import org.apache.pulsar.common.policies.data.NamespaceIsolationData; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; + +public class NamespaceResources extends BaseResources<Policies> { + @Getter + private IsolationPolicyResources isolationPolicies; + + public NamespaceResources(MetadataStoreExtended store) { + super(store, Policies.class); + isolationPolicies = new IsolationPolicyResources(store); + } + + public static class IsolationPolicyResources extends BaseResources<Map<String, NamespaceIsolationData>> { + public IsolationPolicyResources(MetadataStoreExtended store) { + super(store, new TypeReference<Map<String, NamespaceIsolationData>>() { + }); + } + + public Optional<NamespaceIsolationPolicies> getPolicies(String path) throws MetadataStoreException { + Optional<Map<String, NamespaceIsolationData>> data = super.get(path); + return data.isPresent() ? Optional.of(new NamespaceIsolationPolicies(data.get())) : Optional.empty(); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java new file mode 100644 index 0000000..4384762 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.impl; + +import lombok.AccessLevel; +import lombok.Getter; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; + +@Getter(AccessLevel.PUBLIC) +public class PulsarResources { + + private TenantResources tenatResources; + private ClusterResources clusterResources; + private NamespaceResources namespaceResources; + + public PulsarResources(MetadataStoreExtended configurationMetadataStore) { + tenatResources = new TenantResources(configurationMetadataStore); + clusterResources = new ClusterResources(configurationMetadataStore); + namespaceResources = new NamespaceResources(configurationMetadataStore); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java new file mode 100644 index 0000000..1a4fc38 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.impl; + +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; + +public class TenantResources extends BaseResources<TenantInfo> { + public TenantResources(MetadataStoreExtended store) { + super(store, TenantInfo.class); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java index e1c270d..9c28046 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java @@ -24,8 +24,10 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import javax.ws.rs.DELETE; import javax.ws.rs.GET; @@ -33,35 +35,45 @@ import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TenantsBase extends AdminResource { +public class TenantsBase extends PulsarWebResource { + + private static final Logger log = LoggerFactory.getLogger(TenantsBase.class); @GET @ApiOperation(value = "Get the list of existing tenants.", response = String.class, responseContainer = "List") @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), @ApiResponse(code = 404, message = "Tenant doesn't exist") }) - public List<String> getTenants() { - validateSuperUserAccess(); - + public void getTenants(@Suspended final AsyncResponse asyncResponse) { + final String clientAppId = clientAppId(); try { - List<String> tenants = globalZk().getChildren(path(POLICIES), false); - tenants.sort(null); - return tenants; + validateSuperUserAccess(); } catch (Exception e) { - log.error("[{}] Failed to get tenants list", clientAppId(), e); - throw new RestException(e); + asyncResponse.resume(e); + return; } + tenantResources().getChildrenAsync(path(POLICIES)).whenComplete((tenants, e) -> { + if (e != null) { + log.error("[{}] Failed to get tenants list", clientAppId, e); + asyncResponse.resume(new RestException(e)); + return; + } + tenants.sort(null); + asyncResponse.resume(tenants); + }); } @GET @@ -69,18 +81,25 @@ public class TenantsBase extends AdminResource { @ApiOperation(value = "Get the admin configuration for a given tenant.") @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), @ApiResponse(code = 404, message = "Tenant does not exist") }) - public TenantInfo getTenantAdmin( - @ApiParam(value = "The tenant name") - @PathParam("tenant") String tenant) { - validateSuperUserAccess(); - + public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant) { + final String clientAppId = clientAppId(); try { - return tenantsCache().get(path(POLICIES, tenant)) - .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant does not exist")); + validateSuperUserAccess(); } catch (Exception e) { - log.error("[{}] Failed to get tenant {}", clientAppId(), tenant, e); - throw new RestException(e); + asyncResponse.resume(e); } + + tenantResources().getAsync(path(POLICIES, tenant)).whenComplete((tenantInfo, e) -> { + if (e != null) { + log.error("[{}] Failed to get Tenant {}", clientAppId, e.getMessage()); + asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get Tenant")); + return; + } + boolean response = tenantInfo.isPresent() ? asyncResponse.resume(tenantInfo.get()) + : asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant does not exist")); + return; + }); } @PUT @@ -91,103 +110,113 @@ public class TenantsBase extends AdminResource { @ApiResponse(code = 412, message = "Tenant name is not valid"), @ApiResponse(code = 412, message = "Clusters can not be empty"), @ApiResponse(code = 412, message = "Clusters do not exist") }) - public void createTenant( - @ApiParam(value = "The tenant name") - @PathParam("tenant") String tenant, - @ApiParam(value = "TenantInfo") TenantInfo config) { - validateSuperUserAccess(); - validatePoliciesReadOnlyAccess(); - validateClusters(config); + public void createTenant(@Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant, + @ApiParam(value = "TenantInfo") TenantInfo tenantInfo) { + final String clientAppId = clientAppId(); try { + validateSuperUserAccess(); + validatePoliciesReadOnlyAccess(); + validateClusters(tenantInfo); NamedEntity.checkName(tenant); + } catch (IllegalArgumentException e) { + log.warn("[{}] Failed to create tenant with invalid name {}", clientAppId(), tenant, e); + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid")); + return; + } catch (Exception e) { + asyncResponse.resume(e); + return; + } + + tenantResources().getChildrenAsync(path(POLICIES)).whenComplete((tenants, e) -> { + if (e != null) { + log.error("[{}] Failed to create tenant ", clientAppId, e.getCause()); + asyncResponse.resume(new RestException(e)); + return; + } int maxTenants = pulsar().getConfiguration().getMaxTenants(); - //Due to the cost of distributed locks, no locks are added here. - //In a concurrent scenario, the threshold will be exceeded. + // Due to the cost of distributed locks, no locks are added here. + // In a concurrent scenario, the threshold will be exceeded. if (maxTenants > 0) { - List<String> tenants = globalZk().getChildren(path(POLICIES), false); if (tenants != null && tenants.size() >= maxTenants) { - throw new RestException(Status.PRECONDITION_FAILED, "Exceed the maximum number of tenants"); + asyncResponse.resume( + new RestException(Status.PRECONDITION_FAILED, "Exceed the maximum number of tenants")); + return; } } - zkCreate(path(POLICIES, tenant), jsonMapper().writeValueAsBytes(config)); - log.info("[{}] Created tenant {}", clientAppId(), tenant); - } catch (KeeperException.NodeExistsException e) { - log.warn("[{}] Failed to create already existing tenant {}", clientAppId(), tenant); - throw new RestException(Status.CONFLICT, "Tenant already exists"); - } catch (IllegalArgumentException e) { - log.warn("[{}] Failed to create tenant with invalid name {}", clientAppId(), tenant, e); - throw new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"); - } catch (Exception e) { - log.error("[{}] Failed to create tenant {}", clientAppId(), tenant, e); - throw new RestException(e); - } + tenantResources().existsAsync(path(POLICIES, tenant)).thenAccept(exist ->{ + if (exist) { + asyncResponse.resume(new RestException(Status.CONFLICT, "Tenant already exist")); + return; + } + tenantResources().createAsync(path(POLICIES, tenant), tenantInfo).thenAccept((r) -> { + log.info("[{}] Created tenant {}", clientAppId(), tenant); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + log.error("[{}] Failed to create tenant {}", clientAppId, tenant, e); + asyncResponse.resume(new RestException(ex)); + return null; + }); + }).exceptionally(ex -> { + log.error("[{}] Failed to create tenant {}", clientAppId(), tenant, e); + asyncResponse.resume(new RestException(ex)); + return null; + }); + }); } @POST @Path("/{tenant}") @ApiOperation(value = "Update the admins for a tenant.", - notes = "This operation requires Pulsar super-user privileges.") + notes = "This operation requires Pulsar super-user privileges.") @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), @ApiResponse(code = 404, message = "Tenant does not exist"), @ApiResponse(code = 409, message = "Tenant already exists"), @ApiResponse(code = 412, message = "Clusters can not be empty"), @ApiResponse(code = 412, message = "Clusters do not exist") }) - public void updateTenant( - @ApiParam(value = "The tenant name") - @PathParam("tenant") String tenant, - @ApiParam(value = "TenantInfo") TenantInfo newTenantAdmin) { - validateSuperUserAccess(); - validatePoliciesReadOnlyAccess(); - validateClusters(newTenantAdmin); - - Stat nodeStat = new Stat(); + public void updateTenant(@Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant, + @ApiParam(value = "TenantInfo") TenantInfo newTenantAdmin) { try { - byte[] content = globalZk().getData(path(POLICIES, tenant), null, nodeStat); - TenantInfo oldTenantAdmin = jsonMapper().readValue(content, TenantInfo.class); - List<String> clustersWithActiveNamespaces = Lists.newArrayList(); - if (oldTenantAdmin.getAllowedClusters().size() > newTenantAdmin.getAllowedClusters().size()) { - // Get the colo(s) being removed from the list - oldTenantAdmin.getAllowedClusters().removeAll(newTenantAdmin.getAllowedClusters()); - log.debug("Following clusters are being removed : [{}]", oldTenantAdmin.getAllowedClusters()); - for (String cluster : oldTenantAdmin.getAllowedClusters()) { - if (Constants.GLOBAL_CLUSTER.equals(cluster)) { - continue; - } - List<String> activeNamespaces = Lists.newArrayList(); - try { - activeNamespaces = globalZk().getChildren(path(POLICIES, tenant, cluster), false); - if (activeNamespaces.size() != 0) { - // There are active namespaces in this cluster - clustersWithActiveNamespaces.add(cluster); - } - } catch (KeeperException.NoNodeException nne) { - // Fine, some cluster does not have active namespace. Move on! - } - } - if (!clustersWithActiveNamespaces.isEmpty()) { - // Throw an exception because colos being removed are having active namespaces - String msg = String.format( - "Failed to update the tenant because active namespaces are present in colos %s." - + " Please delete those namespaces first", - clustersWithActiveNamespaces); - throw new RestException(Status.CONFLICT, msg); - } - } - String tenantPath = path(POLICIES, tenant); - globalZk().setData(tenantPath, jsonMapper().writeValueAsBytes(newTenantAdmin), -1); - globalZkCache().invalidate(tenantPath); - log.info("[{}] updated tenant {}", clientAppId(), tenant); - } catch (RestException re) { - throw re; - } catch (KeeperException.NoNodeException e) { - log.warn("[{}] Failed to update tenant {}: does not exist", clientAppId(), tenant); - throw new RestException(Status.NOT_FOUND, "Tenant does not exist"); + validateSuperUserAccess(); + validatePoliciesReadOnlyAccess(); + validateClusters(newTenantAdmin); } catch (Exception e) { - log.error("[{}] Failed to update tenant {}", clientAppId(), tenant, e); - throw new RestException(e); + asyncResponse.resume(e); + return; } + + final String clientAddId = clientAppId(); + tenantResources().getAsync(path(POLICIES, tenant)).thenAccept(tenantAdmin -> { + if (!tenantAdmin.isPresent()) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant " + tenant + " not found")); + return; + } + TenantInfo oldTenantAdmin = tenantAdmin.get(); + Set<String> newClusters = new HashSet<>(newTenantAdmin.getAllowedClusters()); + canUpdateCluster(tenant, oldTenantAdmin.getAllowedClusters(), newClusters).thenApply(r -> { + tenantResources().setAsync(path(POLICIES, tenant), old -> { + return newTenantAdmin; + }).thenAccept(done -> { + log.info("Successfully updated tenant info {}", tenant); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + log.warn("Failed to update tenant {}", tenant, ex.getCause()); + asyncResponse.resume(new RestException(ex)); + return null; + }); + return null; + }).exceptionally(nsEx -> { + asyncResponse.resume(nsEx.getCause()); + return null; + }); + }).exceptionally(ex -> { + log.error("[{}] Failed to get tenant {}", clientAddId, tenant, ex.getCause()); + asyncResponse.resume(new RestException(ex)); + return null; + }); } @DELETE @@ -196,47 +225,59 @@ public class TenantsBase extends AdminResource { @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), @ApiResponse(code = 404, message = "Tenant does not exist"), @ApiResponse(code = 409, message = "The tenant still has active namespaces") }) - public void deleteTenant( - @PathParam("tenant") - @ApiParam(value = "The tenant name") - String tenant) { - validateSuperUserAccess(); - validatePoliciesReadOnlyAccess(); - - boolean isTenantEmpty; + public void deleteTenant(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") @ApiParam(value = "The tenant name") String tenant) { try { - isTenantEmpty = getListOfNamespaces(tenant).isEmpty(); - } catch (KeeperException.NoNodeException e) { - log.warn("[{}] Failed to delete tenant {}: does not exist", clientAppId(), tenant); - throw new RestException(Status.NOT_FOUND, "The tenant does not exist"); + validateSuperUserAccess(); + validatePoliciesReadOnlyAccess(); } catch (Exception e) { - log.error("[{}] Failed to get tenant status {}", clientAppId(), tenant, e); - throw new RestException(e); + asyncResponse.resume(e); + return; } - if (!isTenantEmpty) { - log.warn("[{}] Failed to delete tenant {}: not empty", clientAppId(), tenant); - throw new RestException(Status.CONFLICT, "The tenant still has active namespaces"); - } - - try { - // First try to delete every cluster z-node - for (String cluster : globalZk().getChildren(path(POLICIES, tenant), false)) { - globalZk().delete(path(POLICIES, tenant, cluster), -1); + tenantResources().existsAsync(path(POLICIES, tenant)).thenApply(exists ->{ + if (!exists) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant doesn't exist")); + return null; } - - globalZk().delete(path(POLICIES, tenant), -1); - log.info("[{}] Deleted tenant {}", clientAppId(), tenant); - } catch (Exception e) { - log.error("[{}] Failed to delete tenant {}", clientAppId(), tenant, e); - throw new RestException(e); - } + return hasActiveNamespace(tenant).thenAccept(ns -> { + try { + // already fetched children and they should be in the cache + List<CompletableFuture<Void>> clusterList = Lists.newArrayList(); + for (String cluster : tenantResources().getChildrenAsync(path(POLICIES, tenant)).get()) { + clusterList.add(tenantResources().deleteAsync(path(POLICIES, tenant, cluster))); + } + FutureUtil.waitForAll(clusterList).thenAccept(c -> { + tenantResources().deleteAsync(path(POLICIES, tenant)).thenAccept(t -> { + log.info("[{}] Deleted tenant {}", clientAppId(), tenant); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + log.error("Failed to delete tenant {}", tenant, ex.getCause()); + asyncResponse.resume(new RestException(ex)); + return null; + }); + }).exceptionally(ex -> { + log.error("Failed to delete clusters under tenant {}", tenant, ex.getCause()); + asyncResponse.resume(new RestException(ex)); + return null; + }); + log.info("[{}] Deleted tenant {}", clientAppId(), tenant); + } catch (Exception e) { + log.error("[{}] Failed to delete tenant {}", clientAppId(), tenant, e); + asyncResponse.resume(new RestException(e)); + } + }).exceptionally(ex -> { + log.error("Failed to delete tenant due to active namespace {}", tenant, ex.getCause()); + asyncResponse.resume(new RestException(ex)); + return null; + }); + }); } private void validateClusters(TenantInfo info) { // empty cluster shouldn't be allowed - if (info == null || info.getAllowedClusters().stream() - .filter(c -> !StringUtils.isBlank(c)).collect(Collectors.toSet()).isEmpty() + if (info == null || info.getAllowedClusters().stream().filter(c -> !StringUtils.isBlank(c)) + .collect(Collectors.toSet()).isEmpty() || info.getAllowedClusters().stream().anyMatch(ac -> StringUtils.isBlank(ac))) { log.warn("[{}] Failed to validate due to clusters are empty", clientAppId()); throw new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty"); @@ -244,11 +285,11 @@ public class TenantsBase extends AdminResource { List<String> nonexistentClusters; try { - Set<String> availableClusters = clustersListCache().get(); + Set<String> availableClusters = clusterResources().list(); Set<String> allowedClusters = info.getAllowedClusters(); - nonexistentClusters = allowedClusters.stream() - .filter(cluster -> !(availableClusters.contains(cluster) || Constants.GLOBAL_CLUSTER.equals(cluster))) - .collect(Collectors.toList()); + nonexistentClusters = allowedClusters.stream().filter( + cluster -> !(availableClusters.contains(cluster) || Constants.GLOBAL_CLUSTER.equals(cluster))) + .collect(Collectors.toList()); } catch (Exception e) { log.error("[{}] Failed to get available clusters", clientAppId(), e); throw new RestException(e); @@ -258,6 +299,4 @@ public class TenantsBase extends AdminResource { throw new RestException(Status.PRECONDITION_FAILED, "Clusters do not exist"); } } - - private static final Logger log = LoggerFactory.getLogger(TenantsBase.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index f10431c..1a307a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -21,12 +21,16 @@ package org.apache.pulsar.broker.web; import static com.google.common.base.Preconditions.checkArgument; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.pulsar.broker.admin.AdminResource.POLICIES_READONLY_FLAG_PATH; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.BoundType; +import com.google.common.collect.Lists; import com.google.common.collect.Range; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -44,6 +48,10 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.admin.impl.ClusterResources; +import org.apache.pulsar.broker.admin.impl.NamespaceResources; +import org.apache.pulsar.broker.admin.impl.NamespaceResources.IsolationPolicyResources; +import org.apache.pulsar.broker.admin.impl.TenantResources; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authorization.AuthorizationService; @@ -65,7 +73,9 @@ import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.path.PolicyPath; -import org.apache.zookeeper.KeeperException; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,7 +179,7 @@ public abstract class PulsarWebResource { * @throws WebApplicationException * if not authorized */ - protected void validateSuperUserAccess() { + public void validateSuperUserAccess() { if (config().isAuthenticationEnabled()) { String appId = clientAppId(); if (log.isDebugEnabled()) { @@ -245,15 +255,8 @@ public abstract class PulsarWebResource { (isClientAuthenticated(clientAppId)), clientAppId); } - TenantInfo tenantInfo; - - try { - tenantInfo = pulsar.getConfigurationCache().propertiesCache().get(path(POLICIES, tenant)) - .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant does not exist")); - } catch (KeeperException.NoNodeException e) { - log.warn("Failed to get tenant info data for non existing tenant {}", tenant); - throw new RestException(Status.NOT_FOUND, "Tenant does not exist"); - } + TenantInfo tenantInfo = pulsar.getPulsarResources().getTenatResources().get(path(POLICIES, tenant)) + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant does not exist")); if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) { if (!isClientAuthenticated(clientAppId)) { @@ -308,7 +311,7 @@ public abstract class PulsarWebResource { protected void validateClusterForTenant(String tenant, String cluster) { TenantInfo tenantInfo; try { - tenantInfo = pulsar().getConfigurationCache().propertiesCache().get(path(POLICIES, tenant)) + tenantInfo = pulsar().getPulsarResources().getTenatResources().get(path(POLICIES, tenant)) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant does not exist")); } catch (RestException e) { log.warn("Failed to get tenant admin data for tenant {}", tenant); @@ -859,4 +862,134 @@ public abstract class PulsarWebResource { } } } + + protected TenantResources tenantResources() { + return pulsar().getPulsarResources().getTenatResources(); + } + + protected ClusterResources clusterResources() { + return pulsar().getPulsarResources().getClusterResources(); + } + + protected NamespaceResources namespaceResources() { + return pulsar().getPulsarResources().getNamespaceResources(); + } + + protected IsolationPolicyResources namespaceIsolationPolicies(){ + return namespaceResources().getIsolationPolicies(); + } + + public static ObjectMapper jsonMapper() { + return ObjectMapperFactory.getThreadLocal(); + } + + public void validatePoliciesReadOnlyAccess() { + try { + if (clusterResources().existsAsync(AdminResource.POLICIES_READONLY_FLAG_PATH).get()) { + log.debug("Policies are read-only. Broker cannot do read-write operations"); + throw new RestException(Status.FORBIDDEN, "Broker is forbidden to do read-write operations"); + } + } catch (Exception e) { + log.warn("Unable to fetch read-only policy config {}", POLICIES_READONLY_FLAG_PATH, e); + throw new RestException(e); + } + } + + protected CompletableFuture<Void> hasActiveNamespace(String tenant) { + CompletableFuture<Void> activeNamespaceFuture = new CompletableFuture<>(); + tenantResources().getChildrenAsync(path(POLICIES, tenant)).thenAccept(clusterOrNamespaceList -> { + if (clusterOrNamespaceList == null || clusterOrNamespaceList.isEmpty()) { + activeNamespaceFuture.complete(null); + return; + } + List<CompletableFuture<Void>> activeNamespaceListFuture = Lists.newArrayList(); + clusterOrNamespaceList.forEach(clusterOrNamespace -> { + // get list of active V1 namespace + CompletableFuture<Void> checkNs = new CompletableFuture<>(); + activeNamespaceListFuture.add(checkNs); + tenantResources().getChildrenAsync(path(POLICIES, tenant, clusterOrNamespace)) + .whenComplete((children, ex) -> { + if (ex != null) { + checkNs.completeExceptionally(ex); + return; + } + if (children != null && !children.isEmpty()) { + checkNs.completeExceptionally( + new RestException(Status.PRECONDITION_FAILED, "Tenant has active namespace")); + return; + } + String namespace = NamespaceName.get(tenant, clusterOrNamespace).toString(); + // if the length is 0 then this is probably a leftover cluster from namespace + // created + // with the v1 admin format (prop/cluster/ns) and then deleted, so no need to + // add it to the list + namespaceResources().getAsync(path(POLICIES, namespace)).thenApply(data -> { + if (data.isPresent()) { + checkNs.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, + "Tenant has active namespace")); + } else { + checkNs.complete(null); + } + return null; + }).exceptionally(ex2 -> { + if (ex2.getCause() instanceof MetadataStoreException.ContentDeserializationException) { + // it's not a valid namespace-node + checkNs.complete(null); + } else { + checkNs.completeExceptionally( + new RestException(Status.INTERNAL_SERVER_ERROR, ex2.getCause())); + } + return null; + }); + }); + FutureUtil.waitForAll(activeNamespaceListFuture).thenAccept(r -> { + activeNamespaceFuture.complete(null); + }).exceptionally(ex -> { + activeNamespaceFuture.completeExceptionally(ex.getCause()); + return null; + }); + }); + }).exceptionally(ex -> { + activeNamespaceFuture.completeExceptionally(ex.getCause()); + return null; + }); + return activeNamespaceFuture; + } + + protected void validateClusterExists(String cluster) { + try { + if (!clusterResources().get(path("clusters", cluster)).isPresent()) { + throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist."); + } + } catch (Exception e) { + throw new RestException(e); + } + } + + protected CompletableFuture<Void> canUpdateCluster(String tenant, Set<String> oldClusters, + Set<String> newClusters) { + List<CompletableFuture<Void>> activeNamespaceFuture = Lists.newArrayList(); + for (String cluster : oldClusters) { + if (Constants.GLOBAL_CLUSTER.equals(cluster) || newClusters.contains(cluster)) { + continue; + } + CompletableFuture<Void> checkNs = new CompletableFuture<>(); + activeNamespaceFuture.add(checkNs); + tenantResources().getChildrenAsync(path(POLICIES, tenant, cluster)).whenComplete((activeNamespaces, ex) -> { + if (ex != null) { + log.warn("Failed to get namespaces under {}-{}, {}", tenant, cluster, ex.getCause().getMessage()); + checkNs.completeExceptionally(ex.getCause()); + return; + } + if (activeNamespaces.size() > 0) { + log.warn("{}/{} Active-namespaces {}", tenant, cluster, activeNamespaces); + checkNs.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, "Active namespaces")); + return; + } + checkNs.complete(null); + }); + } + return activeNamespaceFuture.isEmpty() ? CompletableFuture.completedFuture(null) + : FutureUtil.waitForAll(activeNamespaceFuture); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 8179afa..268a4ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -339,7 +339,6 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { } catch (PulsarAdminException e) { assertTrue(e instanceof NotFoundException); } - // verify delete cluster failed try { admin.clusters().deleteCluster("test"); @@ -626,6 +625,13 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { @Test(enabled = true) public void properties() throws PulsarAdminException { + try { + admin.tenants().getTenantInfo("does-not-exist"); + fail("should have failed"); + } catch (PulsarAdminException e) { + assertTrue(e instanceof NotFoundException); + } + Set<String> allowedClusters = Sets.newHashSet("test"); TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), allowedClusters); admin.tenants().updateTenant("prop-xyz", tenantInfo); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 047a174..f51eed1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -43,11 +43,15 @@ import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.TimeoutHandler; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; @@ -82,12 +86,13 @@ import org.apache.pulsar.common.policies.data.ResourceQuota; import org.apache.pulsar.common.stats.AllocatorStats; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; +import org.apache.pulsar.metadata.impl.AbstractMetadataStore; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooDefs.Ids; import org.mockito.ArgumentCaptor; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -129,18 +134,15 @@ public class AdminTest extends MockedPulsarServiceBaseTest { clusters = spy(new Clusters()); clusters.setPulsar(pulsar); - doReturn(mockZooKeeperGlobal).when(clusters).globalZk(); + /*doReturn(mockZooKeeperGlobal).when(clusters).globalZk(); doReturn(configurationCache.clustersCache()).when(clusters).clustersCache(); doReturn(configurationCache.clustersListCache()).when(clusters).clustersListCache(); - doReturn(configurationCache.namespaceIsolationPoliciesCache()).when(clusters).namespaceIsolationPoliciesCache(); + doReturn(configurationCache.namespaceIsolationPoliciesCache()).when(clusters).namespaceIsolationPoliciesCache();*/ doReturn("test").when(clusters).clientAppId(); doNothing().when(clusters).validateSuperUserAccess(); properties = spy(new Properties()); - properties.setServletContext(new MockServletContext()); properties.setPulsar(pulsar); - doReturn(mockZooKeeperGlobal).when(properties).globalZk(); - doReturn(configurationCache.propertiesCache()).when(properties).tenantsCache(); doReturn("test").when(properties).clientAppId(); doNothing().when(properties).validateSuperUserAccess(); @@ -239,7 +241,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest { clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com")); verify(clusters, times(1)).validateSuperUserAccess(); // ensure to read from ZooKeeper directly - clusters.clustersListCache().clear(); + //clusters.clustersListCache().clear(); assertEquals(clusters.getClusters(), Lists.newArrayList("use")); // Check creating existing cluster @@ -329,6 +331,14 @@ public class AdminTest extends MockedPulsarServiceBaseTest { && path.equals("/admin/clusters"); }); configurationCache.clustersListCache().clear(); + // clear caches to load data from metadata-store again + MetadataCacheImpl<ClusterData> clusterCache = (MetadataCacheImpl<ClusterData>) pulsar.getPulsarResources() + .getClusterResources().getCache(); + MetadataCacheImpl isolationPolicyCache = (MetadataCacheImpl) pulsar.getPulsarResources() + .getNamespaceResources().getIsolationPolicies().getCache(); + AbstractMetadataStore store = (AbstractMetadataStore) clusterCache.getStore(); + clusterCache.invalidateAll(); + store.invalidateAll(); try { clusters.getClusters(); fail("should have failed"); @@ -351,6 +361,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest { return op == MockZooKeeper.Op.GET && path.equals("/admin/clusters/test"); }); + clusterCache.invalidateAll(); + store.invalidateAll(); try { clusters.updateCluster("test", new ClusterData("http://broker.messaging.test.example.com")); fail("should have failed"); @@ -386,6 +398,9 @@ public class AdminTest extends MockedPulsarServiceBaseTest { return op == MockZooKeeper.Op.GET && path.equals("/admin/clusters/use/namespaceIsolationPolicies"); }); + clusterCache.invalidateAll(); + isolationPolicyCache.invalidateAll(); + store.invalidateAll(); try { clusters.deleteCluster("use"); fail("should have failed"); @@ -402,9 +417,19 @@ public class AdminTest extends MockedPulsarServiceBaseTest { } } + Object asynRequests(Consumer<TestAsyncResponse> function) throws Exception { + TestAsyncResponse ctx = new TestAsyncResponse(); + function.accept(ctx); + ctx.latch.await(); + if (ctx.e != null) { + throw (Exception) ctx.e; + } + return ctx.response; + } @Test - public void properties() throws Exception { - assertEquals(properties.getTenants(), Lists.newArrayList()); + public void properties() throws Throwable { + Object response = asynRequests(ctx -> properties.getTenants(ctx)); + assertEquals(response, Lists.newArrayList()); verify(properties, times(1)).validateSuperUserAccess(); // create local cluster @@ -413,29 +438,33 @@ public class AdminTest extends MockedPulsarServiceBaseTest { Set<String> allowedClusters = Sets.newHashSet(); allowedClusters.add(configClusterName); TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), allowedClusters); - properties.createTenant("test-property", tenantInfo); + response = asynRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo)); verify(properties, times(2)).validateSuperUserAccess(); - assertEquals(properties.getTenants(), Lists.newArrayList("test-property")); + response = asynRequests(ctx -> properties.getTenants(ctx)); + assertEquals(response, Lists.newArrayList("test-property")); verify(properties, times(3)).validateSuperUserAccess(); - assertEquals(properties.getTenantAdmin("test-property"), tenantInfo); + response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); + assertEquals(response, tenantInfo); verify(properties, times(4)).validateSuperUserAccess(); - TenantInfo newPropertyAdmin = new TenantInfo(Sets.newHashSet("role1", "other-role"), allowedClusters); - properties.updateTenant("test-property", newPropertyAdmin); + final TenantInfo newPropertyAdmin = new TenantInfo(Sets.newHashSet("role1", "other-role"), allowedClusters); + response = asynRequests(ctx -> properties.updateTenant(ctx, "test-property", newPropertyAdmin)); verify(properties, times(5)).validateSuperUserAccess(); // Wait for updateTenant to take effect Thread.sleep(100); - assertEquals(properties.getTenantAdmin("test-property"), newPropertyAdmin); - assertNotSame(properties.getTenantAdmin("test-property"), tenantInfo); + response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); + assertEquals(response, newPropertyAdmin); + response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); + assertNotSame(response, tenantInfo); verify(properties, times(7)).validateSuperUserAccess(); // Check creating existing property try { - properties.createTenant("test-property", tenantInfo); + response = asynRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode()); @@ -443,14 +472,14 @@ public class AdminTest extends MockedPulsarServiceBaseTest { // Check non-existing property try { - properties.getTenantAdmin("non-existing"); + response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "non-existing")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); } try { - properties.updateTenant("xxx-non-existing", newPropertyAdmin); + response = asynRequests(ctx -> properties.updateTenant(ctx, "xxx-non-existing", newPropertyAdmin)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); @@ -458,93 +487,97 @@ public class AdminTest extends MockedPulsarServiceBaseTest { // Check deleting non-existing property try { - properties.deleteTenant("non-existing"); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "non-existing")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); } + // clear caches to load data from metadata-store again + MetadataCacheImpl<TenantInfo> cache = (MetadataCacheImpl<TenantInfo>) pulsar.getPulsarResources() + .getTenatResources().getCache(); + AbstractMetadataStore store = (AbstractMetadataStore) cache.getStore(); + cache.invalidateAll(); + store.invalidateAll(); // Test zk failures mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> { - return op == MockZooKeeper.Op.GET_CHILDREN - && path.equals("/admin/policies"); - }); + return op == MockZooKeeper.Op.GET_CHILDREN && path.equals("/admin/policies"); + }); try { - properties.getTenants(); + response = asynRequests(ctx -> properties.getTenants(ctx)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> { - return op == MockZooKeeper.Op.GET - && path.equals("/admin/policies/my-tenant"); - }); + return op == MockZooKeeper.Op.GET && path.equals("/admin/policies/my-tenant"); + }); try { - properties.getTenantAdmin("my-tenant"); + response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "my-tenant")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> { - return op == MockZooKeeper.Op.GET - && path.equals("/admin/policies/my-tenant"); - }); + return op == MockZooKeeper.Op.GET && path.equals("/admin/policies/my-tenant"); + }); try { - properties.updateTenant("my-tenant", newPropertyAdmin); + response = asynRequests(ctx -> properties.updateTenant(ctx, "my-tenant", newPropertyAdmin)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> { - return op == MockZooKeeper.Op.CREATE - && path.equals("/admin/policies/test"); - }); + return op == MockZooKeeper.Op.CREATE && path.equals("/admin/policies/test"); + }); try { - properties.createTenant("test", tenantInfo); + response = asynRequests(ctx -> properties.createTenant(ctx, "test", tenantInfo)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> { - return op == MockZooKeeper.Op.GET_CHILDREN - && path.equals("/admin/policies/my-tenant"); - }); + return op == MockZooKeeper.Op.GET_CHILDREN && path.equals("/admin/policies/test-property"); + }); try { - properties.deleteTenant("my-tenant"); + cache.invalidateAll(); + store.invalidateAll(); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "test-property")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - properties.createTenant("error-property", tenantInfo); + response = asynRequests(ctx -> properties.createTenant(ctx, "error-property", tenantInfo)); mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> { - return op == MockZooKeeper.Op.DELETE - && path.equals("/admin/policies/error-property"); - }); + return op == MockZooKeeper.Op.DELETE && path.equals("/admin/policies/error-property"); + }); try { - properties.deleteTenant("error-property"); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "error-property")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - properties.deleteTenant("test-property"); - properties.deleteTenant("error-property"); - assertEquals(properties.getTenants(), Lists.newArrayList()); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "test-property")); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "error-property")); + response = Lists.newArrayList(); + response = asynRequests(ctx -> properties.getTenants(ctx)); + assertEquals(response, Lists.newArrayList()); // Create a namespace to test deleting a non-empty property - newPropertyAdmin = new TenantInfo(Sets.newHashSet("role1", "other-role"), Sets.newHashSet("use")); - properties.createTenant("my-tenant", newPropertyAdmin); + TenantInfo newPropertyAdmin2 = new TenantInfo(Sets.newHashSet("role1", "other-role"), Sets.newHashSet("use")); + response = asynRequests(ctx -> properties.createTenant(ctx, "my-tenant", newPropertyAdmin2)); namespaces.createNamespace("my-tenant", "use", "my-namespace", new BundlesData()); try { - properties.deleteTenant("my-tenant"); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "my-tenant")); fail("should have failed"); } catch (RestException e) { // Ok @@ -552,7 +585,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest { // Check name validation try { - properties.createTenant("test&", tenantInfo); + response = asynRequests(ctx -> properties.createTenant(ctx, "test&", tenantInfo)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); @@ -560,7 +593,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest { // Check tenantInfo is null try { - properties.createTenant("tenant-config-is-null", null); + response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-null", null)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); @@ -571,7 +604,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest { Set<String> blankClusters = Sets.newHashSet(blankCluster); TenantInfo tenantWithEmptyCluster = new TenantInfo(Sets.newHashSet("role1", "role2"), blankClusters); try { - properties.createTenant("tenant-config-is-empty", tenantWithEmptyCluster); + response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-empty", tenantWithEmptyCluster)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); @@ -582,18 +615,18 @@ public class AdminTest extends MockedPulsarServiceBaseTest { containBlankClusters.add(configClusterName); TenantInfo tenantContainEmptyCluster = new TenantInfo(Sets.newHashSet(), containBlankClusters); try { - properties.createTenant("tenant-config-contain-empty", tenantContainEmptyCluster); + response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-contain-empty", tenantContainEmptyCluster)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); } - AsyncResponse response = mock(AsyncResponse.class); - namespaces.deleteNamespace(response, "my-tenant", "use", "my-namespace", false, false); + AsyncResponse response2 = mock(AsyncResponse.class); + namespaces.deleteNamespace(response2, "my-tenant", "use", "my-namespace", false, false); ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class); - verify(response, timeout(5000).times(1)).resume(captor.capture()); + verify(response2, timeout(5000).times(1)).resume(captor.capture()); assertEquals(captor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode()); - properties.deleteTenant("my-tenant"); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "my-tenant")); } @Test @@ -654,9 +687,9 @@ public class AdminTest extends MockedPulsarServiceBaseTest { // create policies TenantInfo admin = new TenantInfo(); admin.getAllowedClusters().add(cluster); - mockZooKeeperGlobal.create(PulsarWebResource.path(POLICIES, property), - ObjectMapperFactory.getThreadLocal().writeValueAsBytes(admin), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + ClusterData clusterData = new ClusterData(cluster); + clusters.createCluster(cluster, clusterData ); + asynRequests(ctx -> properties.createTenant(ctx, property, admin)); // customized bandwidth for this namespace double customizeBandwidth = 3000; @@ -762,4 +795,85 @@ public class AdminTest extends MockedPulsarServiceBaseTest { } + static class TestAsyncResponse implements AsyncResponse { + + Object response; + Throwable e; + CountDownLatch latch = new CountDownLatch(1); + + @Override + public boolean resume(Object response) { + this.response = response; + latch.countDown(); + return true; + } + + @Override + public boolean resume(Throwable response) { + this.e = response; + latch.countDown(); + return true; + } + + @Override + public boolean cancel() { + return false; + } + + @Override + public boolean cancel(int retryAfter) { + return false; + } + + @Override + public boolean cancel(Date retryAfter) { + return false; + } + + @Override + public boolean isSuspended() { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public boolean setTimeout(long time, TimeUnit unit) { + return false; + } + + @Override + public void setTimeoutHandler(TimeoutHandler handler) { + + } + + @Override + public Collection<Class<?>> register(Class<?> callback) { + return null; + } + + @Override + public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback, Class<?>... callbacks) { + return null; + } + + @Override + public Collection<Class<?>> register(Object callback) { + return null; + } + + @Override + public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... callbacks) { + return null; + } + + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 4ad5145..6f68f71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -276,6 +276,7 @@ public abstract class MockedPulsarServiceBaseTest { doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory(); doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory(); doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore(); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(pulsar).createConfigurationMetadataStore(); Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar)); doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java index 1b0c7f3..ec9fde5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java @@ -129,7 +129,7 @@ public class OwnerShipForCurrentServerTestBase { doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory(); doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory(); doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore(); - + doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore(); Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar)); doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 5688f97..12dcac5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -57,12 +57,12 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.client.admin.BrokerStats; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -785,7 +785,11 @@ public class BrokerServiceTest extends BrokerTestBase { @Test public void testTopicLoadingOnDisableNamespaceBundle() throws Exception { final String namespace = "prop/disableBundle"; - admin.namespaces().createNamespace(namespace); + try { + admin.namespaces().createNamespace(namespace); + } catch (PulsarAdminException.ConflictException e) { + // Ok.. (if test fails intermittently and namespace is already created) + } admin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("test")); // own namespace bundle diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index 6a2cfaa..6a5e254 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -133,7 +133,7 @@ public class TransactionTestBase { doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory(); doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory(); doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore(); - + doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore(); Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar)); doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index 04f9d81..b9eb85e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -44,6 +44,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.KeyManager; @@ -57,6 +58,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.MockedBookKeeperClientFactory; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; @@ -67,9 +69,12 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.zookeeper.MockedZooKeeperClientFactoryImpl; +import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; +import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.BoundRequestBuilder; import org.asynchttpclient.DefaultAsyncHttpClient; @@ -323,8 +328,6 @@ public class WebServiceTest { } } - MockedZooKeeperClientFactoryImpl zkFactory = new MockedZooKeeperClientFactoryImpl(); - private void setupEnv(boolean enableFilter, String minApiVersion, boolean allowUnversionedClients, boolean enableTls, boolean enableAuth, boolean allowInsecure, double rateLimit) throws Exception { if (pulsar != null) { @@ -363,8 +366,20 @@ public class WebServiceTest { } pulsar = spy(new PulsarService(config)); - doReturn(zkFactory).when(pulsar).getZooKeeperClientFactory(); - doReturn(new ZKMetadataStore(MockZooKeeper.newInstance())).when(pulsar).createLocalMetadataStore(); + // mock zk + MockZooKeeper mockZooKeeper = MockedPulsarServiceBaseTest.createMockZooKeeper(); + ZooKeeperClientFactory mockZooKeeperClientFactory = new ZooKeeperClientFactory() { + + @Override + public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType, + int zkSessionTimeoutMillis) { + // Always return the same instance (so that we don't loose the mock ZK content on broker restart + return CompletableFuture.completedFuture(mockZooKeeper); + } + }; + doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory(); + doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore(); + doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore(); doReturn(new MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory(); pulsar.start(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java index fbd5f77..d9ffda0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java @@ -35,6 +35,10 @@ public class MetadataStoreException extends IOException { super(msg); } + public MetadataStoreException(String msg, Throwable t) { + super(msg, t); + } + /** * Implementation is invalid */ diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index 6706108..8ad90df 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.github.benmanes.caffeine.cache.AsyncCacheLoader; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; - +import com.google.common.annotations.VisibleForTesting; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.List; import java.util.Map; @@ -34,7 +34,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; - +import lombok.Getter; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; @@ -49,6 +49,7 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5); + @Getter private final MetadataStore store; private final MetadataSerde<T> serde; @@ -227,6 +228,11 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica objCache.synchronous().invalidate(path); } + @VisibleForTesting + public void invalidateAll() { + objCache.synchronous().invalidateAll(); + } + @Override public void accept(Notification t) { String path = t.getPath(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 42293c7..a7ff703 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory; import com.github.benmanes.caffeine.cache.AsyncCacheLoader; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.annotations.VisibleForTesting; import io.netty.util.concurrent.DefaultThreadFactory; @@ -211,6 +212,12 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co executor.awaitTermination(10, TimeUnit.SECONDS); } + @VisibleForTesting + public void invalidateAll() { + childrenCache.synchronous().invalidateAll(); + existsCache.synchronous().invalidateAll(); + } + protected static String parent(String path) { int idx = path.lastIndexOf('/'); if (idx <= 0) { diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/MockedZooKeeperClientFactoryImpl.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/MockedZooKeeperClientFactoryImpl.java index 7a051ab..352db0b 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/MockedZooKeeperClientFactoryImpl.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/MockedZooKeeperClientFactoryImpl.java @@ -33,7 +33,7 @@ import org.apache.zookeeper.data.ACL; public class MockedZooKeeperClientFactoryImpl implements ZooKeeperClientFactory { - Queue<MockZooKeeper> createdInstances = new ConcurrentLinkedQueue<>(); + public Queue<MockZooKeeper> createdInstances = new ConcurrentLinkedQueue<>(); @Override public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType, int zkSessionTimeoutMillis) {