This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 39bb67542f2 [fix][broker] Fix wrong behaviour when using 
namespace.allowed_clusters, such as namespace deletion and namespace policies 
updating (#24860)
39bb67542f2 is described below

commit 39bb67542f2a7b849acaff681d408c693e1a2a18
Author: fengyubiao <[email protected]>
AuthorDate: Fri Oct 31 01:00:25 2025 +0800

    [fix][broker] Fix wrong behaviour when using namespace.allowed_clusters, 
such as namespace deletion and namespace policies updating (#24860)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |   3 +
 .../pulsar/broker/admin/impl/ClustersBase.java     |   9 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 111 +++++++++--------
 .../broker/admin/impl/PersistentTopicsBase.java    |   2 +-
 .../pulsar/broker/service/AbstractTopic.java       |   2 +
 .../pulsar/broker/service/BrokerService.java       |  75 ++++++++---
 .../broker/service/persistent/PersistentTopic.java |  12 +-
 .../pulsar/broker/web/PulsarWebResource.java       |   3 +-
 ...AdminApiNamespaceIsolationMultiBrokersTest.java |   3 +-
 .../broker/lookup/http/HttpTopicLookupv2Test.java  |   2 +-
 .../broker/namespace/NamespaceServiceTest.java     |   6 +-
 .../service/OneWayReplicatorUsingGlobalZKTest.java | 137 +++++++++++++++++++++
 .../pulsar/common/policies/data/Policies.java      |  61 ++++++++-
 13 files changed, 343 insertions(+), 83 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 84d55430f8f..bfa1fdc812b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -514,6 +514,9 @@ public abstract class AdminResource extends 
PulsarWebResource {
         return getNamespacePolicies(ns);
     }
 
+    /**
+     * Directly get the replication clusters for a namespace, without checking 
allowed clusters.
+     */
     protected CompletableFuture<Set<String>> 
getNamespaceReplicatedClustersAsync(NamespaceName namespaceName) {
         return namespaceResources().getPoliciesAsync(namespaceName)
                 .thenApply(policies -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index d24a3255b55..0ae18fc2e54 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -59,6 +59,7 @@ import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamedEntity;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -792,9 +793,11 @@ public class ClustersBase extends AdminResource {
                     .map(tenant -> 
adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
                         List<CompletableFuture<String>> 
namespaceNamesInCluster = namespaces.stream()
                                 .map(namespaceName -> 
adminClient.namespaces().getPoliciesAsync(namespaceName)
-                                        .thenApply(policies -> 
policies.replication_clusters.contains(cluster)
-                                                ? namespaceName : null))
-                                .collect(Collectors.toList());
+                                    .thenApply(policies -> {
+                                        boolean allowed = 
pulsar().getBrokerService()
+                                            
.isCurrentClusterAllowed(NamespaceName.get(namespaceName), policies);
+                                        return allowed ? namespaceName : null;
+                                    })).collect(Collectors.toList());
                         return 
FutureUtil.waitForAll(namespaceNamesInCluster).thenApply(
                                 __ -> namespaceNamesInCluster.stream()
                                         .map(CompletableFuture::join)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0e821c83e61..4491701a60d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -27,7 +27,6 @@ import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -221,7 +220,8 @@ public abstract class NamespacesBase extends AdminResource {
         precheckWhenDeleteNamespace(namespaceName, force)
                 .thenCompose(policies -> {
                     final CompletableFuture<List<String>> topicsFuture;
-                    if (policies == null || 
CollectionUtils.isEmpty(policies.replication_clusters)){
+                    if (policies == null || !pulsar().getBrokerService()
+                            .isCurrentClusterAllowed(namespaceName, policies)) 
{
                         topicsFuture = 
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
                     } else {
                         topicsFuture = 
pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
@@ -418,22 +418,25 @@ public abstract class NamespacesBase extends 
AdminResource {
                                 return CompletableFuture.completedFuture(null);
                             }
                             Policies policies = policiesOpt.get();
-                            Set<String> replicationClusters = 
policies.replication_clusters;
-                            if (replicationClusters.size() > 1) {
+                            // Just keep the behavior of V1 namespace being 
the same as before.
+                            if (!nsName.isV2() && 
policies.replication_clusters.isEmpty()
+                                    && policies.allowed_clusters.isEmpty()) {
+                                return 
CompletableFuture.completedFuture(policies);
+                            }
+                            String cluster = 
policies.getClusterThatCanDeleteNamespace();
+                            if (cluster == null) {
                                 // There are still more than one clusters 
configured for the global namespace
                                 throw new 
RestException(Status.PRECONDITION_FAILED,
-                                        "Cannot delete the global namespace " 
+ nsName + ". There are still more than "
-                                        + "one replication clusters 
configured.");
+                                    "Cannot delete the global namespace " + 
nsName + ". There are still more than "
+                                    + "one replication clusters configured.");
                             }
-                            if (replicationClusters.size() == 1
-                                    && 
!policies.replication_clusters.contains(config().getClusterName())) {
+                            if (!cluster.equals(config().getClusterName())) {
                                 // the only replication cluster is other 
cluster, redirect
-                                String replCluster = new 
ArrayList<>(policies.replication_clusters).get(0);
-                                return 
clusterResources().getClusterAsync(replCluster)
+                                return 
clusterResources().getClusterAsync(cluster)
                                         .thenCompose(replClusterDataOpt -> {
                                             ClusterData replClusterData = 
replClusterDataOpt
                                                     .orElseThrow(() -> new 
RestException(Status.NOT_FOUND,
-                                                            "Cluster " + 
replCluster + " does not exist"));
+                                                            "Cluster " + 
cluster + " does not exist"));
                                             URL replClusterUrl;
                                             try {
                                                 if 
(!replClusterData.isBrokerClientTlsEnabled()) {
@@ -453,7 +456,7 @@ public abstract class NamespacesBase extends AdminResource {
                                                     
.replaceQueryParam("authoritative", false).build();
                                             if (log.isDebugEnabled()) {
                                                 log.debug("[{}] Redirecting 
the rest call to {}: cluster={}",
-                                                        clientAppId(), 
redirect, replCluster);
+                                                        clientAppId(), 
redirect, cluster);
                                             }
                                             throw new WebApplicationException(
                                                     
Response.temporaryRedirect(redirect).build());
@@ -503,22 +506,25 @@ public abstract class NamespacesBase extends 
AdminResource {
                 .thenCompose(policies -> {
                     CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
                     if (namespaceName.isGlobal()) {
-
-                        if (policies.replication_clusters.size() > 1) {
+                        // Just keep the behavior of V1 namespace being the 
same as before.
+                        if (!namespaceName.isV2() && 
policies.replication_clusters.isEmpty()
+                                && policies.allowed_clusters.isEmpty()) {
+                            return CompletableFuture.completedFuture(null);
+                        }
+                        String cluster = 
policies.getClusterThatCanDeleteNamespace();
+                        if (cluster == null) {
                             // There are still more than one clusters 
configured for the global namespace
                             throw new 
RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
                                     + namespaceName
                                     + ". There are still more than one 
replication clusters configured.");
                         }
-                        if (policies.replication_clusters.size() == 1
-                                && 
!policies.replication_clusters.contains(config().getClusterName())) {
+                        if (!cluster.equals(config().getClusterName())) { // 
No need to change.
                             // the only replication cluster is other cluster, 
redirect
-                            String replCluster = new 
ArrayList<>(policies.replication_clusters).get(0);
-                            future = 
clusterResources().getClusterAsync(replCluster)
+                            future = 
clusterResources().getClusterAsync(cluster)
                                     .thenCompose(clusterData -> {
                                         if (clusterData.isEmpty()) {
                                             throw new 
RestException(Status.NOT_FOUND,
-                                                    "Cluster " + replCluster + 
" does not exist");
+                                                    "Cluster " + cluster + " 
does not exist");
                                         }
                                         ClusterData replClusterData = 
clusterData.get();
                                         URL replClusterUrl;
@@ -542,7 +548,7 @@ public abstract class NamespacesBase extends AdminResource {
                                                         
.replaceQueryParam("authoritative", false).build();
                                         if (log.isDebugEnabled()) {
                                             log.debug("[{}] Redirecting the 
rest call to {}: cluster={}",
-                                                    clientAppId(), redirect, 
replCluster);
+                                                    clientAppId(), redirect, 
cluster);
                                         }
                                         throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
                                     });
@@ -739,6 +745,9 @@ public abstract class NamespacesBase extends AdminResource {
                         subscriptionName, role, null/* additional auth-data 
json */));
     }
 
+    /**
+     * Directly get the replication clusters for a namespace, without checking 
allowed clusters.
+     */
     protected CompletableFuture<Set<String>> 
internalGetNamespaceReplicationClustersAsync() {
         return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.REPLICATION, PolicyOperation.READ)
                 .thenAccept(__ -> {
@@ -778,21 +787,19 @@ public abstract class NamespacesBase extends 
AdminResource {
                                                     "Invalid cluster id: " + 
clusterId);
                                         }
                                         return 
validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
-                                                .thenCompose(__ -> 
getNamespacePoliciesAsync(this.namespaceName)
-                                                        
.thenCompose(nsPolicies -> {
-                                                            if 
(nsPolicies.allowed_clusters.isEmpty()) {
-                                                                return 
validateClusterForTenantAsync(
-                                                                        
namespaceName.getTenant(), clusterId);
-                                                            }
-                                                            if 
(!nsPolicies.allowed_clusters.contains(clusterId)) {
-                                                                String msg = 
String.format("Cluster [%s] is not in the "
-                                                                        + 
"list of allowed clusters list for namespace "
-                                                                        + 
"[%s]", clusterId, namespaceName.toString());
-                                                                log.info(msg);
-                                                                throw new 
RestException(Status.FORBIDDEN, msg);
-                                                            }
-                                                            return 
CompletableFuture.completedFuture(null);
-                                                        }));
+                                            .thenCompose(__ -> 
getNamespacePoliciesAsync(this.namespaceName)
+                                                .thenCompose(nsPolicies -> {
+                                                    if 
(!Policies.checkNewReplicationClusters(nsPolicies,
+                                                            
replicationClusterSet)) {
+                                                        String msg = 
String.format("Cluster [%s] is not in the "
+                                                                + "list of 
allowed clusters list for namespace "
+                                                                + "[%s]", 
clusterId, namespaceName.toString());
+                                                        log.info(msg);
+                                                        throw new 
RestException(Status.BAD_REQUEST, msg);
+                                                    }
+                                                    return 
validateClusterForTenantAsync(
+                                                            
namespaceName.getTenant(), clusterId);
+                                                }));
                                     }).collect(Collectors.toList());
                             return FutureUtil.waitForAll(futures).thenApply(__ 
-> replicationClusterSet);
                         }))
@@ -1967,13 +1974,17 @@ public abstract class NamespacesBase extends 
AdminResource {
     }
 
     private CompletableFuture<Void> validatePoliciesAsync(NamespaceName ns, 
Policies policies) {
-        if (ns.isV2() && policies.replication_clusters.isEmpty()) {
-            // Default to local cluster
-            policies.replication_clusters = 
Collections.singleton(config().getClusterName());
+        if (!policies.checkAllowedAndReplicationClusters()) {
+            String msg = String.format("[%s] All replication clusters should 
be included in allowed clusters."
+                    + " Repl clusters: %s, allowed clusters: %s",
+                    ns.toString(), policies.replication_clusters, 
policies.allowed_clusters);
+            log.info(msg);
+            throw new RestException(Status.BAD_REQUEST, msg);
         }
+        
pulsar().getBrokerService().setCurrentClusterAllowedIfNoClusterIsAllowed(ns, 
policies);
 
         // Validate cluster names and permissions
-        return policies.replication_clusters.stream()
+        return Stream.concat(policies.replication_clusters.stream(), 
policies.allowed_clusters.stream())
                     .map(cluster -> 
validateClusterForTenantAsync(ns.getTenant(), cluster))
                     .reduce(CompletableFuture.completedFuture(null), (a, b) -> 
a.thenCompose(ignore -> b))
             .thenAccept(__ -> {
@@ -2864,16 +2875,15 @@ public abstract class NamespacesBase extends 
AdminResource {
                         throw new RestException(Status.PRECONDITION_FAILED,
                                 "Cannot specify global in the list of allowed 
clusters");
                     }
-                    return 
getNamespacePoliciesAsync(this.namespaceName).thenApply(namespacePolicies -> {
-                        
namespacePolicies.replication_clusters.forEach(replicationCluster -> {
-                            if (!clusterIds.contains(replicationCluster)) {
-                                throw new RestException(Status.BAD_REQUEST,
-                                        String.format("Allowed clusters do not 
contain the replication cluster %s. "
-                                                + "Please remove the 
replication cluster if the cluster is not allowed "
-                                                + "for this namespace", 
replicationCluster));
-                            }
-                        });
-                        return Sets.newHashSet(clusterIds);
+                    return 
getNamespacePoliciesAsync(this.namespaceName).thenApply(nsPolicies -> {
+                        Set<String> clusterSet = Sets.newHashSet(clusterIds);
+                        if (!Policies.checkNewAllowedClusters(nsPolicies, 
clusterSet)){
+                            throw new RestException(Status.BAD_REQUEST,
+                                    String.format("Allowed clusters do not 
contain the replication cluster %s. "
+                                        + "Please remove the replication 
cluster if the cluster is not allowed "
+                                        + "for this namespace", 
nsPolicies.replication_clusters));
+                        }
+                        return clusterSet;
                     });
                 })
                 // Verify the allowed clusters are valid and they do not 
contain the peer clusters.
@@ -2896,6 +2906,9 @@ public abstract class NamespacesBase extends 
AdminResource {
                 }));
     }
 
+    /**
+     * Directly get the allowed clusters for a namespace, without checking 
replication clusters.
+     */
     protected CompletableFuture<Set<String>> 
internalGetNamespaceAllowedClustersAsync() {
         return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ)
                 .thenAccept(__ -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ec61d58d2af..9318246afc1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -514,7 +514,7 @@ public class PersistentTopicsBase extends AdminResource {
                 return CompletableFuture.completedFuture(null);
             }
             // Query the topic-level policies only if the namespace-level 
policies exist.
-            // Global policies does not affet Replication.
+            // Global policies does not affect Replication.
             final var namespacePolicies = optionalPolicies.get();
             return 
pulsar().getTopicPoliciesService().getTopicPoliciesAsync(topicName,
                     TopicPoliciesService.GetType.LOCAL_ONLY
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 24bce1e39bb..0ac5ae999b0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -97,6 +97,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
     protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
 
     protected final String topic;
+    protected final NamespaceName namespace;
 
     // Reference to the CompletableFuture returned when creating this topic in 
BrokerService.
     // Used to safely remove the topic from BrokerService's cache by ensuring 
we remove the exact
@@ -183,6 +184,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
 
     public AbstractTopic(String topic, BrokerService brokerService) {
         this.topic = topic;
+        this.namespace = TopicName.get(topic).getNamespaceObject();
         this.clock = brokerService.getClock();
         this.brokerService = brokerService;
         this.producers = new ConcurrentHashMap<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0a27a089f63..db58e2e6525 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -156,6 +156,7 @@ import 
org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
 import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
+import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
@@ -2770,26 +2771,27 @@ public class BrokerService implements Closeable {
             return;
         }
         final String localCluster = 
this.pulsar.getConfiguration().getClusterName();
-        if (!data.replication_clusters.contains(localCluster)) {
-            pulsar().getNamespaceService().getNamespaceBundleFactory()
-                    .getBundlesAsync(namespace).thenAccept(bundles -> {
-                bundles.getBundles().forEach(bundle -> {
-                    
pulsar.getNamespaceService().isNamespaceBundleOwned(bundle).thenAccept(isExist 
-> {
-                        if (isExist) {
-                            this.pulsar().getExecutor().execute(() -> {
-                                try {
-                                    
pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespace.toString(),
-                                            bundle.getBundleRange());
-                                } catch (Exception e) {
-                                    log.error("Failed to unload 
namespace-bundle {} that not owned by {}, {}",
-                                            bundle.toString(), localCluster, 
e.getMessage());
-                                }
-                            });
-                        }
-                    });
+        if (pulsar.getBrokerService().isCurrentClusterAllowed(namespace, 
data)) {
+            return;
+        }
+        pulsar().getNamespaceService().getNamespaceBundleFactory()
+                .getBundlesAsync(namespace).thenAccept(bundles -> {
+            bundles.getBundles().forEach(bundle -> {
+                
pulsar.getNamespaceService().isNamespaceBundleOwned(bundle).thenAccept(isExist 
-> {
+                    if (isExist) {
+                        this.pulsar().getExecutor().execute(() -> {
+                            try {
+                                
pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespace.toString(),
+                                        bundle.getBundleRange());
+                            } catch (Exception e) {
+                                log.error("Failed to unload namespace-bundle 
{} that not owned by {}, {}",
+                                        bundle.toString(), localCluster, 
e.getMessage());
+                            }
+                        });
+                    }
                 });
             });
-        }
+        });
     }
 
     public PulsarService pulsar() {
@@ -3890,4 +3892,41 @@ public class BrokerService implements Closeable {
     public void 
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
         this.pulsarChannelInitFactory = factory;
     }
+
+    /***
+     * After PIP-321 Introduce allowed-cluster at the namespace level, the 
condition that whether the cluster is
+     * allowed to access by the current cluster was defined by two fields:
+     * - {@link Policies#replication_clusters}
+     * - {@link Policies#allowed_clusters}
+     * {@link Policies#allowed_clusters} has higher priority. Once it's set, 
{@link Policies#replication_clusters} only
+     * means the default replication clusters for the topics under the 
namespace.
+     */
+    public boolean isCurrentClusterAllowed(NamespaceName nsName, Policies 
nsPolicies) {
+        // Compatibility with v1 version namespace.
+        if (Constants.GLOBAL_CLUSTER.equalsIgnoreCase(nsName.getCluster())) {
+            return 
nsPolicies.replication_clusters.contains(pulsar.getConfig().getClusterName());
+        }
+        // If allowed clusters has been set, only check allowed clusters.
+        if (!nsPolicies.allowed_clusters.isEmpty()) {
+            return 
nsPolicies.allowed_clusters.contains(pulsar.getConfig().getClusterName());
+        }
+        // Otherwise, replication clusters means allowed clusters.
+        return 
nsPolicies.replication_clusters.contains(pulsar.getConfig().getClusterName());
+    }
+
+    public void setCurrentClusterAllowedIfNoClusterIsAllowed(NamespaceName 
nsName, Policies nsPolicies) {
+        // Compatibility with v1 version namespace.
+        if (!nsName.isV2()) {
+            return;
+        }
+        if 
(nsPolicies.replication_clusters.contains(pulsar.getConfig().getClusterName())
+                || 
nsPolicies.allowed_clusters.contains(pulsar.getConfig().getClusterName())) {
+            return;
+        }
+        if (nsPolicies.replication_clusters.isEmpty()) {
+            
nsPolicies.replication_clusters.add(pulsar.getConfig().getClusterName());
+        } else {
+            
nsPolicies.allowed_clusters.add(pulsar.getConfig().getClusterName());
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 58ca93a1b3c..4b28171ea99 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2088,17 +2088,17 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     }
 
     protected CompletableFuture<Boolean> checkAllowedCluster(String 
localCluster) {
-        List<String> replicationClusters = 
topicPolicies.getReplicationClusters().get();
+        List<String> topicRepls = topicPolicies.getReplicationClusters().get();
         return 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
-                
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional
 -> {
+                
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(nsPolicies
 -> {
                     Set<String> allowedClusters = Set.of();
-                    if (policiesOptional.isPresent()) {
-                        allowedClusters = 
policiesOptional.get().allowed_clusters;
+                    if (nsPolicies.isPresent()) {
+                        allowedClusters = nsPolicies.get().allowed_clusters;
                     }
-                    if (TopicName.get(topic).isGlobal() && 
!replicationClusters.contains(localCluster)
+                    if (TopicName.get(topic).isGlobal() && 
!topicRepls.contains(localCluster)
                             && !allowedClusters.contains(localCluster)) {
                         log.warn("Local cluster {} is not part of global 
namespace repl list {} and allowed list {}",
-                                localCluster, replicationClusters, 
allowedClusters);
+                                localCluster, topicRepls, allowedClusters);
                         return CompletableFuture.completedFuture(false);
                     } else {
                         return CompletableFuture.completedFuture(true);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 353f2fa6f2e..65489eaa34b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -912,8 +912,7 @@ public abstract class PulsarWebResource {
                             localCluster, namespace.toString());
                     log.warn(msg);
                     validationFuture.completeExceptionally(new 
RestException(Status.PRECONDITION_FAILED, msg));
-                } else if 
(!policies.replication_clusters.contains(localCluster) && 
!policies.allowed_clusters
-                        .contains(localCluster)) {
+                } else if 
(!pulsarService.getBrokerService().isCurrentClusterAllowed(namespace, 
policies)) {
                     getOwnerFromPeerClusterListAsync(pulsarService, 
policies.replication_clusters,
                             policies.allowed_clusters)
                             .thenAccept(ownerPeerCluster -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
index da7d95d677a..09d8adcacc8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
@@ -77,7 +77,8 @@ public class AdminApiNamespaceIsolationMultiBrokersTest 
extends MultiBrokerBaseT
     public void testNamespaceIsolationPolicyForReplNS() throws Exception {
 
         // Verify that namespace is not present in cluster-2.
-        Set<String> replicationClusters = 
localAdmin.namespaces().getPolicies("prop-ig/ns1").replication_clusters;
+        Set<String> replicationClusters = localAdmin.namespaces()
+                .getPolicies("prop-ig/ns1").replication_clusters; // Nothing 
is needed to be changed.
         Assert.assertFalse(replicationClusters.contains("cluster-2"));
 
         // setup ns-isolation-policy in both the clusters.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
index 2b24265a364..c8a75af3660 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
@@ -278,7 +278,7 @@ public class HttpTopicLookupv2Test {
         destLookup.lookupTopicAsync(asyncResponse, 
TopicDomain.persistent.value(), property, cluster, ns2,
                 "invalid-localCluster", false, null, null);
         verify(asyncResponse).resume(arg.capture());
-        assertEquals(arg.getValue().getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
+        assertEquals(arg.getValue().getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 094a29c1c6c..5d88cd38813 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -868,6 +868,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
                     "Cluster [r3] is not in the list of allowed clusters list 
for tenant [my-tenant]");
         }
         // 3. Clean up
+        admin.namespaces().setNamespaceAllowedClusters(namespace, 
Set.of(pulsar.getConfig().getClusterName()));
         admin.namespaces().deleteNamespace(namespace, true);
         admin.tenants().deleteTenant(tenant, true);
         for (String cluster : clusters) {
@@ -935,7 +936,9 @@ public class NamespaceServiceTest extends BrokerTestBase {
             namespaces.setNamespaceReplicationClusters(namespace, 
replicationClustersExcel);
             fail();
             //Todo: The status code in the old implementation is confused.
-        } catch (PulsarAdminException.NotAuthorizedException ignore) {}
+        } catch (PulsarAdminException ignore) {
+            assertTrue(ignore.getMessage().contains("allowed clusters list"));
+        }
 
         // 2.2 Peer cluster can not be a part of the allowed clusters.
         LinkedHashSet<String> peerCluster = new LinkedHashSet<>();
@@ -950,6 +953,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
 
         // CleanUp: Namespace with replication clusters can not be deleted by 
force.
         namespaces.setNamespaceReplicationClusters(namespace, 
Set.of(conf.getClusterName()));
+        namespaces.setNamespaceAllowedClusters(namespace, 
Set.of(conf.getClusterName()));
         admin.namespaces().deleteNamespace(namespace, true);
         admin.tenants().deleteTenant(tenant, true);
         for (String cluster : clusters) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 18f601d51b3..a0db3c8e438 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -24,9 +24,13 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -37,11 +41,16 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
+import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -545,4 +554,132 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
     public void testPartitionedTopicWithTopicPolicyAndNoReplicationClusters() 
throws Exception {
         super.testPartitionedTopicWithTopicPolicyAndNoReplicationClusters();
     }
+
+    @Test
+    public void testUpdateNamespaceIsolationPolicy() throws Exception {
+        // Create a namespace and allow both clusters to access.
+        final String ns1 = defaultTenant + "/ns_" + 
UUID.randomUUID().toString().replace("-", "");
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
ns1 + "/tp");
+        admin2.namespaces().createNamespace(ns1);
+        admin2.namespaces().setNamespaceAllowedClusters(ns1, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+        admin1.topics().createNonPartitionedTopic(topic);
+        Producer<String> p = 
client1.newProducer(Schema.STRING).topic(topic).create();
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topic, false)
+                .join().get();
+
+        // Set namespace isolation policy.
+        // It will trigger a namespace unloading.
+        String policyName = "policy-1";
+        String namespaceRegex = ns1;
+        String brokerName = pulsar1.getAdvertisedAddress();
+        Map<String, String> parameters1 = new HashMap<>();
+        parameters1.put("min_limit", "1");
+        parameters1.put("usage_threshold", "100");
+        NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
+                .namespaces(Collections.singletonList(namespaceRegex))
+                .primary(Collections.singletonList(brokerName))
+                .secondary(Collections.singletonList(brokerName + ".*"))
+                .autoFailoverPolicy(AutoFailoverPolicyData.builder()
+                        .policyType(AutoFailoverPolicyType.min_available)
+                        .parameters(parameters1)
+                        .build())
+                .build();
+        admin1.clusters().createNamespaceIsolationPolicy(cluster1, policyName, 
nsPolicyData1);
+
+        // Verify: the namespace was unloaded.
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(persistentTopic.isClosingOrDeleting());
+        });
+        // Verify: the producer still works.
+        p.send("msg-1");
+
+        // cleanup.
+        p.close();
+        admin1.clusters().deleteNamespaceIsolationPolicy(cluster1, policyName);
+        admin1.topics().delete(topic);
+    }
+
+    /**
+     * Namespace deletion should not be allowed if more than one cluster is 
allowed to access.
+     */
+    @Test
+    public void testDeleteNamespaceIfTwoClustersAllowed() throws Exception {
+        // Create a namespace and allow both clusters to access.
+        final String ns1 = defaultTenant + "/ns_" + 
UUID.randomUUID().toString().replace("-", "");
+        admin1.namespaces().createNamespace(ns1);
+        Awaitility.await().untilAsserted(() -> {
+            List<String> clusters = 
admin1.namespaces().getNamespaceReplicationClusters(ns1);
+            assertEquals(clusters.size(), 1);
+            assertTrue(clusters.contains(cluster1));
+        });
+        admin1.namespaces().setNamespaceAllowedClusters(ns1, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+        Awaitility.await().untilAsserted(() -> {
+            List<String> clusters = 
admin1.namespaces().getNamespaceAllowedClusters(ns1);
+            assertEquals(clusters.size(), 2);
+            assertTrue(clusters.contains(cluster1));
+            assertTrue(clusters.contains(cluster2));
+        });
+        try {
+            admin1.namespaces().deleteNamespace(ns1);
+            fail("namespace deletion should not be allowed if more than one 
cluster to access");
+        } catch (PulsarAdminException.PreconditionFailedException e) {
+            // expected.
+        }
+    }
+
+    @Test
+    public void testSetClustersAndAllowedClusters() throws Exception {
+        final String ns1 = defaultTenant + "/ns_" + 
UUID.randomUUID().toString().replace("-", "");
+        admin1.namespaces().createNamespace(ns1);
+        Awaitility.await().untilAsserted(() -> {
+            List<String> clusters = 
admin1.namespaces().getNamespaceReplicationClusters(ns1);
+            assertEquals(clusters.size(), 1);
+            assertTrue(clusters.contains(cluster1));
+        });
+
+        // New allowed clusters should include all replication clusters
+        try {
+            admin1.namespaces().setNamespaceAllowedClusters(ns1, new 
HashSet<>(Arrays.asList(cluster2)));
+            fail("New allowed clusters should include all replication 
clusters.");
+        } catch (PulsarAdminException e) {
+            assertTrue(e.getMessage().contains("do not contain the replication 
cluster"));
+        }
+
+        admin1.namespaces().setNamespaceAllowedClusters(ns1, new 
HashSet<>(Arrays.asList(cluster1)));
+
+        // New replication clusters should be included in allowed clusters.
+        try {
+            admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+            fail("New replication clusters should be included in allowed 
clusters.");
+        } catch (PulsarAdminException e) {
+            assertTrue(e.getMessage().contains("is not in the list of allowed 
clusters list"));
+        }
+    }
+
+    @Test
+    public void testUpdateNamespacePolicies() throws Exception {
+        // Create a namespace and allow both clusters to access.
+        final String ns1 = defaultTenant + "/ns_" + 
UUID.randomUUID().toString().replace("-", "");
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
ns1 + "/tp");
+        admin2.namespaces().createNamespace(ns1);
+        admin2.namespaces().setNamespaceAllowedClusters(ns1, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+        admin1.topics().createNonPartitionedTopic(topic);
+        Producer<String> p = 
client1.newProducer(Schema.STRING).topic(topic).create();
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topic, false)
+                .join().get();
+
+        admin1.namespaces().setRetention(ns1, new RetentionPolicies(10, 10));
+        Awaitility.await().untilAsserted(() -> {
+           assertEquals(admin1.namespaces().getRetention(ns1), new 
RetentionPolicies(10, 10));
+        });
+
+        // Verify: the namespace will not be unloaded, because the topic can 
be updated in memory.
+        assertFalse(persistentTopic.isClosingOrDeleting());
+        // Verify: the producer still works.
+        p.send("msg-1");
+
+        // cleanup.
+        p.close();
+        admin1.topics().delete(topic);
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index d5e08a1f50c..a24df3e7ad4 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -36,6 +36,10 @@ public class Policies {
     public final AuthPolicies auth_policies = AuthPolicies.builder().build();
     @SuppressWarnings("checkstyle:MemberName")
     public Set<String> replication_clusters = new HashSet<>();
+    /**
+     * This field has a unique usage: defines whether a namespace is allowed 
to access by the current cluster.
+     * Instead of access this field directly, please call {@link 
BrokerService#isCurrentClusterAllowed}.
+     */
     @SuppressWarnings("checkstyle:MemberName")
     public Set<String> allowed_clusters = new HashSet<>();
     public BundlesData bundles;
@@ -216,9 +220,64 @@ public class Policies {
                     && 
Objects.equals(dispatcherPauseOnAckStatePersistentEnabled,
                     other.dispatcherPauseOnAckStatePersistentEnabled);
         }
-
         return false;
     }
 
+    /**
+     * Get the cluster that can delete the namespace.
+     */
+    public String getClusterThatCanDeleteNamespace() {
+        if (this.replication_clusters.size() != 1 ||  
this.allowed_clusters.size() > 1) {
+            return null;
+        }
+        String cluster = this.replication_clusters.iterator().next();
+        // The namespace can be deleted if the current cluster is the only one 
cluster who can access it.
+        if (!this.allowed_clusters.isEmpty() && 
this.allowed_clusters.contains(cluster)) {
+            return cluster;
+        } else {
+            return cluster;
+        }
+    }
+
+    /**
+     * Replication clusters should be included in allowed clusters.
+     */
+    public static boolean checkNewReplicationClusters(Policies oldNsPolicies, 
Set<String> newReplicationClusters) {
+        if (oldNsPolicies.allowed_clusters.isEmpty()) {
+            return true;
+        }
+        for (String newCluster : newReplicationClusters) {
+            if (!oldNsPolicies.allowed_clusters.contains(newCluster)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Allowed cluster should contain all clusters that are defined in 
replication clusters.
+     */
+    public static boolean checkNewAllowedClusters(Policies oldNsPolicies, 
Set<String> newAllowedClusters) {
+        for (String oldReplicationCluster : 
oldNsPolicies.replication_clusters) {
+            if (!newAllowedClusters.contains(oldReplicationCluster)) {
+                return false;
+            }
+        }
+        return true;
+    }
 
+    /**
+     * Replication clusters should be included in allowed clusters if allowed 
clusters are not empty.
+     */
+    public boolean checkAllowedAndReplicationClusters() {
+        if (this.allowed_clusters.isEmpty()) {
+            return true;
+        }
+        for (String replicationCluster : this.replication_clusters) {
+            if (!this.allowed_clusters.contains(replicationCluster)) {
+                return false;
+            }
+        }
+        return true;
+    }
 }


Reply via email to