This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 39bb67542f2 [fix][broker] Fix wrong behaviour when using
namespace.allowed_clusters, such as namespace deletion and namespace policies
updating (#24860)
39bb67542f2 is described below
commit 39bb67542f2a7b849acaff681d408c693e1a2a18
Author: fengyubiao <[email protected]>
AuthorDate: Fri Oct 31 01:00:25 2025 +0800
[fix][broker] Fix wrong behaviour when using namespace.allowed_clusters,
such as namespace deletion and namespace policies updating (#24860)
---
.../apache/pulsar/broker/admin/AdminResource.java | 3 +
.../pulsar/broker/admin/impl/ClustersBase.java | 9 +-
.../pulsar/broker/admin/impl/NamespacesBase.java | 111 +++++++++--------
.../broker/admin/impl/PersistentTopicsBase.java | 2 +-
.../pulsar/broker/service/AbstractTopic.java | 2 +
.../pulsar/broker/service/BrokerService.java | 75 ++++++++---
.../broker/service/persistent/PersistentTopic.java | 12 +-
.../pulsar/broker/web/PulsarWebResource.java | 3 +-
...AdminApiNamespaceIsolationMultiBrokersTest.java | 3 +-
.../broker/lookup/http/HttpTopicLookupv2Test.java | 2 +-
.../broker/namespace/NamespaceServiceTest.java | 6 +-
.../service/OneWayReplicatorUsingGlobalZKTest.java | 137 +++++++++++++++++++++
.../pulsar/common/policies/data/Policies.java | 61 ++++++++-
13 files changed, 343 insertions(+), 83 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 84d55430f8f..bfa1fdc812b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -514,6 +514,9 @@ public abstract class AdminResource extends
PulsarWebResource {
return getNamespacePolicies(ns);
}
+ /**
+ * Directly get the replication clusters for a namespace, without checking
allowed clusters.
+ */
protected CompletableFuture<Set<String>>
getNamespaceReplicatedClustersAsync(NamespaceName namespaceName) {
return namespaceResources().getPoliciesAsync(namespaceName)
.thenApply(policies -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index d24a3255b55..0ae18fc2e54 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -59,6 +59,7 @@ import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamedEntity;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -792,9 +793,11 @@ public class ClustersBase extends AdminResource {
.map(tenant ->
adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
List<CompletableFuture<String>>
namespaceNamesInCluster = namespaces.stream()
.map(namespaceName ->
adminClient.namespaces().getPoliciesAsync(namespaceName)
- .thenApply(policies ->
policies.replication_clusters.contains(cluster)
- ? namespaceName : null))
- .collect(Collectors.toList());
+ .thenApply(policies -> {
+ boolean allowed =
pulsar().getBrokerService()
+
.isCurrentClusterAllowed(NamespaceName.get(namespaceName), policies);
+ return allowed ? namespaceName : null;
+ })).collect(Collectors.toList());
return
FutureUtil.waitForAll(namespaceNamesInCluster).thenApply(
__ -> namespaceNamesInCluster.stream()
.map(CompletableFuture::join)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0e821c83e61..4491701a60d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -27,7 +27,6 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -221,7 +220,8 @@ public abstract class NamespacesBase extends AdminResource {
precheckWhenDeleteNamespace(namespaceName, force)
.thenCompose(policies -> {
final CompletableFuture<List<String>> topicsFuture;
- if (policies == null ||
CollectionUtils.isEmpty(policies.replication_clusters)){
+ if (policies == null || !pulsar().getBrokerService()
+ .isCurrentClusterAllowed(namespaceName, policies))
{
topicsFuture =
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
} else {
topicsFuture =
pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
@@ -418,22 +418,25 @@ public abstract class NamespacesBase extends
AdminResource {
return CompletableFuture.completedFuture(null);
}
Policies policies = policiesOpt.get();
- Set<String> replicationClusters =
policies.replication_clusters;
- if (replicationClusters.size() > 1) {
+ // Just keep the behavior of V1 namespace being
the same as before.
+ if (!nsName.isV2() &&
policies.replication_clusters.isEmpty()
+ && policies.allowed_clusters.isEmpty()) {
+ return
CompletableFuture.completedFuture(policies);
+ }
+ String cluster =
policies.getClusterThatCanDeleteNamespace();
+ if (cluster == null) {
// There are still more than one clusters
configured for the global namespace
throw new
RestException(Status.PRECONDITION_FAILED,
- "Cannot delete the global namespace "
+ nsName + ". There are still more than "
- + "one replication clusters
configured.");
+ "Cannot delete the global namespace " +
nsName + ". There are still more than "
+ + "one replication clusters configured.");
}
- if (replicationClusters.size() == 1
- &&
!policies.replication_clusters.contains(config().getClusterName())) {
+ if (!cluster.equals(config().getClusterName())) {
// the only replication cluster is other
cluster, redirect
- String replCluster = new
ArrayList<>(policies.replication_clusters).get(0);
- return
clusterResources().getClusterAsync(replCluster)
+ return
clusterResources().getClusterAsync(cluster)
.thenCompose(replClusterDataOpt -> {
ClusterData replClusterData =
replClusterDataOpt
.orElseThrow(() -> new
RestException(Status.NOT_FOUND,
- "Cluster " +
replCluster + " does not exist"));
+ "Cluster " +
cluster + " does not exist"));
URL replClusterUrl;
try {
if
(!replClusterData.isBrokerClientTlsEnabled()) {
@@ -453,7 +456,7 @@ public abstract class NamespacesBase extends AdminResource {
.replaceQueryParam("authoritative", false).build();
if (log.isDebugEnabled()) {
log.debug("[{}] Redirecting
the rest call to {}: cluster={}",
- clientAppId(),
redirect, replCluster);
+ clientAppId(),
redirect, cluster);
}
throw new WebApplicationException(
Response.temporaryRedirect(redirect).build());
@@ -503,22 +506,25 @@ public abstract class NamespacesBase extends
AdminResource {
.thenCompose(policies -> {
CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
if (namespaceName.isGlobal()) {
-
- if (policies.replication_clusters.size() > 1) {
+ // Just keep the behavior of V1 namespace being the
same as before.
+ if (!namespaceName.isV2() &&
policies.replication_clusters.isEmpty()
+ && policies.allowed_clusters.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ String cluster =
policies.getClusterThatCanDeleteNamespace();
+ if (cluster == null) {
// There are still more than one clusters
configured for the global namespace
throw new
RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
+ namespaceName
+ ". There are still more than one
replication clusters configured.");
}
- if (policies.replication_clusters.size() == 1
- &&
!policies.replication_clusters.contains(config().getClusterName())) {
+ if (!cluster.equals(config().getClusterName())) { //
No need to change.
// the only replication cluster is other cluster,
redirect
- String replCluster = new
ArrayList<>(policies.replication_clusters).get(0);
- future =
clusterResources().getClusterAsync(replCluster)
+ future =
clusterResources().getClusterAsync(cluster)
.thenCompose(clusterData -> {
if (clusterData.isEmpty()) {
throw new
RestException(Status.NOT_FOUND,
- "Cluster " + replCluster +
" does not exist");
+ "Cluster " + cluster + "
does not exist");
}
ClusterData replClusterData =
clusterData.get();
URL replClusterUrl;
@@ -542,7 +548,7 @@ public abstract class NamespacesBase extends AdminResource {
.replaceQueryParam("authoritative", false).build();
if (log.isDebugEnabled()) {
log.debug("[{}] Redirecting the
rest call to {}: cluster={}",
- clientAppId(), redirect,
replCluster);
+ clientAppId(), redirect,
cluster);
}
throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
});
@@ -739,6 +745,9 @@ public abstract class NamespacesBase extends AdminResource {
subscriptionName, role, null/* additional auth-data
json */));
}
+ /**
+ * Directly get the replication clusters for a namespace, without checking
allowed clusters.
+ */
protected CompletableFuture<Set<String>>
internalGetNamespaceReplicationClustersAsync() {
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.REPLICATION, PolicyOperation.READ)
.thenAccept(__ -> {
@@ -778,21 +787,19 @@ public abstract class NamespacesBase extends
AdminResource {
"Invalid cluster id: " +
clusterId);
}
return
validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
- .thenCompose(__ ->
getNamespacePoliciesAsync(this.namespaceName)
-
.thenCompose(nsPolicies -> {
- if
(nsPolicies.allowed_clusters.isEmpty()) {
- return
validateClusterForTenantAsync(
-
namespaceName.getTenant(), clusterId);
- }
- if
(!nsPolicies.allowed_clusters.contains(clusterId)) {
- String msg =
String.format("Cluster [%s] is not in the "
- +
"list of allowed clusters list for namespace "
- +
"[%s]", clusterId, namespaceName.toString());
- log.info(msg);
- throw new
RestException(Status.FORBIDDEN, msg);
- }
- return
CompletableFuture.completedFuture(null);
- }));
+ .thenCompose(__ ->
getNamespacePoliciesAsync(this.namespaceName)
+ .thenCompose(nsPolicies -> {
+ if
(!Policies.checkNewReplicationClusters(nsPolicies,
+
replicationClusterSet)) {
+ String msg =
String.format("Cluster [%s] is not in the "
+ + "list of
allowed clusters list for namespace "
+ + "[%s]",
clusterId, namespaceName.toString());
+ log.info(msg);
+ throw new
RestException(Status.BAD_REQUEST, msg);
+ }
+ return
validateClusterForTenantAsync(
+
namespaceName.getTenant(), clusterId);
+ }));
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__
-> replicationClusterSet);
}))
@@ -1967,13 +1974,17 @@ public abstract class NamespacesBase extends
AdminResource {
}
private CompletableFuture<Void> validatePoliciesAsync(NamespaceName ns,
Policies policies) {
- if (ns.isV2() && policies.replication_clusters.isEmpty()) {
- // Default to local cluster
- policies.replication_clusters =
Collections.singleton(config().getClusterName());
+ if (!policies.checkAllowedAndReplicationClusters()) {
+ String msg = String.format("[%s] All replication clusters should
be included in allowed clusters."
+ + " Repl clusters: %s, allowed clusters: %s",
+ ns.toString(), policies.replication_clusters,
policies.allowed_clusters);
+ log.info(msg);
+ throw new RestException(Status.BAD_REQUEST, msg);
}
+
pulsar().getBrokerService().setCurrentClusterAllowedIfNoClusterIsAllowed(ns,
policies);
// Validate cluster names and permissions
- return policies.replication_clusters.stream()
+ return Stream.concat(policies.replication_clusters.stream(),
policies.allowed_clusters.stream())
.map(cluster ->
validateClusterForTenantAsync(ns.getTenant(), cluster))
.reduce(CompletableFuture.completedFuture(null), (a, b) ->
a.thenCompose(ignore -> b))
.thenAccept(__ -> {
@@ -2864,16 +2875,15 @@ public abstract class NamespacesBase extends
AdminResource {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of allowed
clusters");
}
- return
getNamespacePoliciesAsync(this.namespaceName).thenApply(namespacePolicies -> {
-
namespacePolicies.replication_clusters.forEach(replicationCluster -> {
- if (!clusterIds.contains(replicationCluster)) {
- throw new RestException(Status.BAD_REQUEST,
- String.format("Allowed clusters do not
contain the replication cluster %s. "
- + "Please remove the
replication cluster if the cluster is not allowed "
- + "for this namespace",
replicationCluster));
- }
- });
- return Sets.newHashSet(clusterIds);
+ return
getNamespacePoliciesAsync(this.namespaceName).thenApply(nsPolicies -> {
+ Set<String> clusterSet = Sets.newHashSet(clusterIds);
+ if (!Policies.checkNewAllowedClusters(nsPolicies,
clusterSet)){
+ throw new RestException(Status.BAD_REQUEST,
+ String.format("Allowed clusters do not
contain the replication cluster %s. "
+ + "Please remove the replication
cluster if the cluster is not allowed "
+ + "for this namespace",
nsPolicies.replication_clusters));
+ }
+ return clusterSet;
});
})
// Verify the allowed clusters are valid and they do not
contain the peer clusters.
@@ -2896,6 +2906,9 @@ public abstract class NamespacesBase extends
AdminResource {
}));
}
+ /**
+ * Directly get the allowed clusters for a namespace, without checking
replication clusters.
+ */
protected CompletableFuture<Set<String>>
internalGetNamespaceAllowedClustersAsync() {
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ)
.thenAccept(__ -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ec61d58d2af..9318246afc1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -514,7 +514,7 @@ public class PersistentTopicsBase extends AdminResource {
return CompletableFuture.completedFuture(null);
}
// Query the topic-level policies only if the namespace-level
policies exist.
- // Global policies does not affet Replication.
+ // Global policies does not affect Replication.
final var namespacePolicies = optionalPolicies.get();
return
pulsar().getTopicPoliciesService().getTopicPoliciesAsync(topicName,
TopicPoliciesService.GetType.LOCAL_ONLY
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 24bce1e39bb..0ac5ae999b0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -97,6 +97,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
protected final String topic;
+ protected final NamespaceName namespace;
// Reference to the CompletableFuture returned when creating this topic in
BrokerService.
// Used to safely remove the topic from BrokerService's cache by ensuring
we remove the exact
@@ -183,6 +184,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
+ this.namespace = TopicName.get(topic).getNamespaceObject();
this.clock = brokerService.getClock();
this.brokerService = brokerService;
this.producers = new ConcurrentHashMap<>();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0a27a089f63..db58e2e6525 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -156,6 +156,7 @@ import
org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
+import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
@@ -2770,26 +2771,27 @@ public class BrokerService implements Closeable {
return;
}
final String localCluster =
this.pulsar.getConfiguration().getClusterName();
- if (!data.replication_clusters.contains(localCluster)) {
- pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getBundlesAsync(namespace).thenAccept(bundles -> {
- bundles.getBundles().forEach(bundle -> {
-
pulsar.getNamespaceService().isNamespaceBundleOwned(bundle).thenAccept(isExist
-> {
- if (isExist) {
- this.pulsar().getExecutor().execute(() -> {
- try {
-
pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespace.toString(),
- bundle.getBundleRange());
- } catch (Exception e) {
- log.error("Failed to unload
namespace-bundle {} that not owned by {}, {}",
- bundle.toString(), localCluster,
e.getMessage());
- }
- });
- }
- });
+ if (pulsar.getBrokerService().isCurrentClusterAllowed(namespace,
data)) {
+ return;
+ }
+ pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundlesAsync(namespace).thenAccept(bundles -> {
+ bundles.getBundles().forEach(bundle -> {
+
pulsar.getNamespaceService().isNamespaceBundleOwned(bundle).thenAccept(isExist
-> {
+ if (isExist) {
+ this.pulsar().getExecutor().execute(() -> {
+ try {
+
pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespace.toString(),
+ bundle.getBundleRange());
+ } catch (Exception e) {
+ log.error("Failed to unload namespace-bundle
{} that not owned by {}, {}",
+ bundle.toString(), localCluster,
e.getMessage());
+ }
+ });
+ }
});
});
- }
+ });
}
public PulsarService pulsar() {
@@ -3890,4 +3892,41 @@ public class BrokerService implements Closeable {
public void
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
this.pulsarChannelInitFactory = factory;
}
+
+ /***
+ * After PIP-321 Introduce allowed-cluster at the namespace level, the
condition that whether the cluster is
+ * allowed to access by the current cluster was defined by two fields:
+ * - {@link Policies#replication_clusters}
+ * - {@link Policies#allowed_clusters}
+ * {@link Policies#allowed_clusters} has higher priority. Once it's set,
{@link Policies#replication_clusters} only
+ * means the default replication clusters for the topics under the
namespace.
+ */
+ public boolean isCurrentClusterAllowed(NamespaceName nsName, Policies
nsPolicies) {
+ // Compatibility with v1 version namespace.
+ if (Constants.GLOBAL_CLUSTER.equalsIgnoreCase(nsName.getCluster())) {
+ return
nsPolicies.replication_clusters.contains(pulsar.getConfig().getClusterName());
+ }
+ // If allowed clusters has been set, only check allowed clusters.
+ if (!nsPolicies.allowed_clusters.isEmpty()) {
+ return
nsPolicies.allowed_clusters.contains(pulsar.getConfig().getClusterName());
+ }
+ // Otherwise, replication clusters means allowed clusters.
+ return
nsPolicies.replication_clusters.contains(pulsar.getConfig().getClusterName());
+ }
+
+ public void setCurrentClusterAllowedIfNoClusterIsAllowed(NamespaceName
nsName, Policies nsPolicies) {
+ // Compatibility with v1 version namespace.
+ if (!nsName.isV2()) {
+ return;
+ }
+ if
(nsPolicies.replication_clusters.contains(pulsar.getConfig().getClusterName())
+ ||
nsPolicies.allowed_clusters.contains(pulsar.getConfig().getClusterName())) {
+ return;
+ }
+ if (nsPolicies.replication_clusters.isEmpty()) {
+
nsPolicies.replication_clusters.add(pulsar.getConfig().getClusterName());
+ } else {
+
nsPolicies.allowed_clusters.add(pulsar.getConfig().getClusterName());
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 58ca93a1b3c..4b28171ea99 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2088,17 +2088,17 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
protected CompletableFuture<Boolean> checkAllowedCluster(String
localCluster) {
- List<String> replicationClusters =
topicPolicies.getReplicationClusters().get();
+ List<String> topicRepls = topicPolicies.getReplicationClusters().get();
return
brokerService.pulsar().getPulsarResources().getNamespaceResources()
-
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional
-> {
+
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(nsPolicies
-> {
Set<String> allowedClusters = Set.of();
- if (policiesOptional.isPresent()) {
- allowedClusters =
policiesOptional.get().allowed_clusters;
+ if (nsPolicies.isPresent()) {
+ allowedClusters = nsPolicies.get().allowed_clusters;
}
- if (TopicName.get(topic).isGlobal() &&
!replicationClusters.contains(localCluster)
+ if (TopicName.get(topic).isGlobal() &&
!topicRepls.contains(localCluster)
&& !allowedClusters.contains(localCluster)) {
log.warn("Local cluster {} is not part of global
namespace repl list {} and allowed list {}",
- localCluster, replicationClusters,
allowedClusters);
+ localCluster, topicRepls, allowedClusters);
return CompletableFuture.completedFuture(false);
} else {
return CompletableFuture.completedFuture(true);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 353f2fa6f2e..65489eaa34b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -912,8 +912,7 @@ public abstract class PulsarWebResource {
localCluster, namespace.toString());
log.warn(msg);
validationFuture.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED, msg));
- } else if
(!policies.replication_clusters.contains(localCluster) &&
!policies.allowed_clusters
- .contains(localCluster)) {
+ } else if
(!pulsarService.getBrokerService().isCurrentClusterAllowed(namespace,
policies)) {
getOwnerFromPeerClusterListAsync(pulsarService,
policies.replication_clusters,
policies.allowed_clusters)
.thenAccept(ownerPeerCluster -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
index da7d95d677a..09d8adcacc8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
@@ -77,7 +77,8 @@ public class AdminApiNamespaceIsolationMultiBrokersTest
extends MultiBrokerBaseT
public void testNamespaceIsolationPolicyForReplNS() throws Exception {
// Verify that namespace is not present in cluster-2.
- Set<String> replicationClusters =
localAdmin.namespaces().getPolicies("prop-ig/ns1").replication_clusters;
+ Set<String> replicationClusters = localAdmin.namespaces()
+ .getPolicies("prop-ig/ns1").replication_clusters; // Nothing
is needed to be changed.
Assert.assertFalse(replicationClusters.contains("cluster-2"));
// setup ns-isolation-policy in both the clusters.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
index 2b24265a364..c8a75af3660 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
@@ -278,7 +278,7 @@ public class HttpTopicLookupv2Test {
destLookup.lookupTopicAsync(asyncResponse,
TopicDomain.persistent.value(), property, cluster, ns2,
"invalid-localCluster", false, null, null);
verify(asyncResponse).resume(arg.capture());
- assertEquals(arg.getValue().getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
+ assertEquals(arg.getValue().getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 094a29c1c6c..5d88cd38813 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -868,6 +868,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
"Cluster [r3] is not in the list of allowed clusters list
for tenant [my-tenant]");
}
// 3. Clean up
+ admin.namespaces().setNamespaceAllowedClusters(namespace,
Set.of(pulsar.getConfig().getClusterName()));
admin.namespaces().deleteNamespace(namespace, true);
admin.tenants().deleteTenant(tenant, true);
for (String cluster : clusters) {
@@ -935,7 +936,9 @@ public class NamespaceServiceTest extends BrokerTestBase {
namespaces.setNamespaceReplicationClusters(namespace,
replicationClustersExcel);
fail();
//Todo: The status code in the old implementation is confused.
- } catch (PulsarAdminException.NotAuthorizedException ignore) {}
+ } catch (PulsarAdminException ignore) {
+ assertTrue(ignore.getMessage().contains("allowed clusters list"));
+ }
// 2.2 Peer cluster can not be a part of the allowed clusters.
LinkedHashSet<String> peerCluster = new LinkedHashSet<>();
@@ -950,6 +953,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
// CleanUp: Namespace with replication clusters can not be deleted by
force.
namespaces.setNamespaceReplicationClusters(namespace,
Set.of(conf.getClusterName()));
+ namespaces.setNamespaceAllowedClusters(namespace,
Set.of(conf.getClusterName()));
admin.namespaces().deleteNamespace(namespace, true);
admin.tenants().deleteTenant(tenant, true);
for (String cluster : clusters) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 18f601d51b3..a0db3c8e438 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -24,9 +24,13 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -37,11 +41,16 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
+import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -545,4 +554,132 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
public void testPartitionedTopicWithTopicPolicyAndNoReplicationClusters()
throws Exception {
super.testPartitionedTopicWithTopicPolicyAndNoReplicationClusters();
}
+
+ @Test
+ public void testUpdateNamespaceIsolationPolicy() throws Exception {
+ // Create a namespace and allow both clusters to access.
+ final String ns1 = defaultTenant + "/ns_" +
UUID.randomUUID().toString().replace("-", "");
+ final String topic = BrokerTestUtil.newUniqueName("persistent://" +
ns1 + "/tp");
+ admin2.namespaces().createNamespace(ns1);
+ admin2.namespaces().setNamespaceAllowedClusters(ns1, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
+ admin1.topics().createNonPartitionedTopic(topic);
+ Producer<String> p =
client1.newProducer(Schema.STRING).topic(topic).create();
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar1.getBrokerService().getTopic(topic, false)
+ .join().get();
+
+ // Set namespace isolation policy.
+ // It will trigger a namespace unloading.
+ String policyName = "policy-1";
+ String namespaceRegex = ns1;
+ String brokerName = pulsar1.getAdvertisedAddress();
+ Map<String, String> parameters1 = new HashMap<>();
+ parameters1.put("min_limit", "1");
+ parameters1.put("usage_threshold", "100");
+ NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
+ .namespaces(Collections.singletonList(namespaceRegex))
+ .primary(Collections.singletonList(brokerName))
+ .secondary(Collections.singletonList(brokerName + ".*"))
+ .autoFailoverPolicy(AutoFailoverPolicyData.builder()
+ .policyType(AutoFailoverPolicyType.min_available)
+ .parameters(parameters1)
+ .build())
+ .build();
+ admin1.clusters().createNamespaceIsolationPolicy(cluster1, policyName,
nsPolicyData1);
+
+ // Verify: the namespace was unloaded.
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(persistentTopic.isClosingOrDeleting());
+ });
+ // Verify: the producer still works.
+ p.send("msg-1");
+
+ // cleanup.
+ p.close();
+ admin1.clusters().deleteNamespaceIsolationPolicy(cluster1, policyName);
+ admin1.topics().delete(topic);
+ }
+
+ /**
+ * Namespace deletion should not be allowed if more than one cluster is
allowed to access.
+ */
+ @Test
+ public void testDeleteNamespaceIfTwoClustersAllowed() throws Exception {
+ // Create a namespace and allow both clusters to access.
+ final String ns1 = defaultTenant + "/ns_" +
UUID.randomUUID().toString().replace("-", "");
+ admin1.namespaces().createNamespace(ns1);
+ Awaitility.await().untilAsserted(() -> {
+ List<String> clusters =
admin1.namespaces().getNamespaceReplicationClusters(ns1);
+ assertEquals(clusters.size(), 1);
+ assertTrue(clusters.contains(cluster1));
+ });
+ admin1.namespaces().setNamespaceAllowedClusters(ns1, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
+ Awaitility.await().untilAsserted(() -> {
+ List<String> clusters =
admin1.namespaces().getNamespaceAllowedClusters(ns1);
+ assertEquals(clusters.size(), 2);
+ assertTrue(clusters.contains(cluster1));
+ assertTrue(clusters.contains(cluster2));
+ });
+ try {
+ admin1.namespaces().deleteNamespace(ns1);
+ fail("namespace deletion should not be allowed if more than one
cluster to access");
+ } catch (PulsarAdminException.PreconditionFailedException e) {
+ // expected.
+ }
+ }
+
+ @Test
+ public void testSetClustersAndAllowedClusters() throws Exception {
+ final String ns1 = defaultTenant + "/ns_" +
UUID.randomUUID().toString().replace("-", "");
+ admin1.namespaces().createNamespace(ns1);
+ Awaitility.await().untilAsserted(() -> {
+ List<String> clusters =
admin1.namespaces().getNamespaceReplicationClusters(ns1);
+ assertEquals(clusters.size(), 1);
+ assertTrue(clusters.contains(cluster1));
+ });
+
+ // New allowed clusters should include all replication clusters
+ try {
+ admin1.namespaces().setNamespaceAllowedClusters(ns1, new
HashSet<>(Arrays.asList(cluster2)));
+ fail("New allowed clusters should include all replication
clusters.");
+ } catch (PulsarAdminException e) {
+ assertTrue(e.getMessage().contains("do not contain the replication
cluster"));
+ }
+
+ admin1.namespaces().setNamespaceAllowedClusters(ns1, new
HashSet<>(Arrays.asList(cluster1)));
+
+ // New replication clusters should be included in allowed clusters.
+ try {
+ admin1.namespaces().setNamespaceReplicationClusters(ns1, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
+ fail("New replication clusters should be included in allowed
clusters.");
+ } catch (PulsarAdminException e) {
+ assertTrue(e.getMessage().contains("is not in the list of allowed
clusters list"));
+ }
+ }
+
+ @Test
+ public void testUpdateNamespacePolicies() throws Exception {
+ // Create a namespace and allow both clusters to access.
+ final String ns1 = defaultTenant + "/ns_" +
UUID.randomUUID().toString().replace("-", "");
+ final String topic = BrokerTestUtil.newUniqueName("persistent://" +
ns1 + "/tp");
+ admin2.namespaces().createNamespace(ns1);
+ admin2.namespaces().setNamespaceAllowedClusters(ns1, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
+ admin1.topics().createNonPartitionedTopic(topic);
+ Producer<String> p =
client1.newProducer(Schema.STRING).topic(topic).create();
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar1.getBrokerService().getTopic(topic, false)
+ .join().get();
+
+ admin1.namespaces().setRetention(ns1, new RetentionPolicies(10, 10));
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin1.namespaces().getRetention(ns1), new
RetentionPolicies(10, 10));
+ });
+
+ // Verify: the namespace will not be unloaded, because the topic can
be updated in memory.
+ assertFalse(persistentTopic.isClosingOrDeleting());
+ // Verify: the producer still works.
+ p.send("msg-1");
+
+ // cleanup.
+ p.close();
+ admin1.topics().delete(topic);
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index d5e08a1f50c..a24df3e7ad4 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -36,6 +36,10 @@ public class Policies {
public final AuthPolicies auth_policies = AuthPolicies.builder().build();
@SuppressWarnings("checkstyle:MemberName")
public Set<String> replication_clusters = new HashSet<>();
+ /**
+ * This field has a unique usage: defines whether a namespace is allowed
to access by the current cluster.
+ * Instead of access this field directly, please call {@link
BrokerService#isCurrentClusterAllowed}.
+ */
@SuppressWarnings("checkstyle:MemberName")
public Set<String> allowed_clusters = new HashSet<>();
public BundlesData bundles;
@@ -216,9 +220,64 @@ public class Policies {
&&
Objects.equals(dispatcherPauseOnAckStatePersistentEnabled,
other.dispatcherPauseOnAckStatePersistentEnabled);
}
-
return false;
}
+ /**
+ * Get the cluster that can delete the namespace.
+ */
+ public String getClusterThatCanDeleteNamespace() {
+ if (this.replication_clusters.size() != 1 ||
this.allowed_clusters.size() > 1) {
+ return null;
+ }
+ String cluster = this.replication_clusters.iterator().next();
+ // The namespace can be deleted if the current cluster is the only one
cluster who can access it.
+ if (!this.allowed_clusters.isEmpty() &&
this.allowed_clusters.contains(cluster)) {
+ return cluster;
+ } else {
+ return cluster;
+ }
+ }
+
+ /**
+ * Replication clusters should be included in allowed clusters.
+ */
+ public static boolean checkNewReplicationClusters(Policies oldNsPolicies,
Set<String> newReplicationClusters) {
+ if (oldNsPolicies.allowed_clusters.isEmpty()) {
+ return true;
+ }
+ for (String newCluster : newReplicationClusters) {
+ if (!oldNsPolicies.allowed_clusters.contains(newCluster)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Allowed cluster should contain all clusters that are defined in
replication clusters.
+ */
+ public static boolean checkNewAllowedClusters(Policies oldNsPolicies,
Set<String> newAllowedClusters) {
+ for (String oldReplicationCluster :
oldNsPolicies.replication_clusters) {
+ if (!newAllowedClusters.contains(oldReplicationCluster)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ /**
+ * Replication clusters should be included in allowed clusters if allowed
clusters are not empty.
+ */
+ public boolean checkAllowedAndReplicationClusters() {
+ if (this.allowed_clusters.isEmpty()) {
+ return true;
+ }
+ for (String replicationCluster : this.replication_clusters) {
+ if (!this.allowed_clusters.contains(replicationCluster)) {
+ return false;
+ }
+ }
+ return true;
+ }
}