This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e57827b333 [improve][admin] PIP-422 part 1: Support global 
topic-level replicated clusters policy (#24390)
1e57827b333 is described below

commit 1e57827b33395a2124593d95fa7641ee5e125d9b
Author: fengyubiao <[email protected]>
AuthorDate: Mon Aug 4 10:16:39 2025 +0800

    [improve][admin] PIP-422 part 1: Support global topic-level replicated 
clusters policy (#24390)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  54 ++++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  67 +++---
 .../SystemTopicBasedTopicPoliciesService.java      |  35 +++-
 .../broker/service/TopicPoliciesService.java       |   9 +
 .../broker/service/persistent/PersistentTopic.java |  48 ++++-
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  39 +++-
 .../broker/service/OneWayReplicatorTest.java       | 233 ++++++++++++++++++---
 .../broker/service/OneWayReplicatorTestBase.java   |  49 ++++-
 ...OneWayReplicatorUsingGlobalPartitionedTest.java |  61 +++++-
 .../service/OneWayReplicatorUsingGlobalZKTest.java | 207 ++++++++++++++++++
 .../apache/pulsar/client/admin/TopicPolicies.java  |  10 +
 .../client/admin/internal/TopicPoliciesImpl.java   |  31 +++
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  |  65 ++++++
 13 files changed, 813 insertions(+), 95 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0e8509db186..459d82cede8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3360,7 +3360,7 @@ public class PersistentTopicsBase extends AdminResource {
                 });
     }
 
-    protected CompletableFuture<Void> 
internalSetReplicationClusters(List<String> clusterIds) {
+    protected CompletableFuture<Void> 
internalSetReplicationClusters(List<String> clusterIds, boolean isGlobal) {
         if (CollectionUtils.isEmpty(clusterIds)) {
             return CompletableFuture.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
                     "ClusterIds should not be null or empty"));
@@ -3373,6 +3373,47 @@ public class PersistentTopicsBase extends AdminResource {
                                 "Cannot specify global in the list of 
replication clusters");
                     }
                 })
+                .thenCompose(__ -> {
+                    // Set a topic-level replicated clusters that do not 
contain local cluster is not meaningful, except
+                    // the following scenario: User has two clusters, which 
enabled Geo-Replication through a global
+                    // metadata store, the resources named partitioned topic 
metadata and the resource namespace-level
+                    // "replicated clusters" are shared between multi 
clusters. Pulsar can hardly delete a specify
+                    // partitioned topic. To support this use case, the 
following steps can implement it:
+                    // 1. set a global topic-level replicated clusters that do 
not contain local cluster.
+                    // 2. the local cluster will remove the subtopics 
automatically, and remove the schemas and local
+                    //    topic policies. Just leave the global topic policies 
there, which prevents the namespace level
+                    //    replicated clusters policy taking affect.
+                    boolean clustersDoesNotContainsLocal = 
CollectionUtils.isEmpty(clusterIds)
+                            || 
!clusterIds.contains(pulsar().getConfig().getClusterName());
+                    if (clustersDoesNotContainsLocal && !isGlobal) {
+                        return FutureUtil.failedFuture(new 
RestException(Response.Status.PRECONDITION_FAILED,
+                                "Can not remove local cluster from the local 
topic-level replication clusters policy"));
+                    }
+                    if (isGlobal) {
+                        return 
getNamespacePoliciesAsync(namespaceName).thenCompose(v -> {
+                            // Since global policies depends on namespace 
level replication, users only can set global
+                            // policies when namespace level replication 
exists. Otherwise, the policies will never be
+                            // copied to the remote side, which is meaningless.
+                            if (v == null || v.replication_clusters.size() < 
2) {
+                                return FutureUtil.failedFuture(new 
RestException(Response.Status.PRECONDITION_FAILED,
+                                    "Please do not use the global topic level 
policy when namespace-level replication"
+                                    + " is not enabled, because the global 
level policy relies on namespace-level"
+                                    + " replication"));
+                            }
+                            for (String clusterId : clusterIds) {
+                                if 
(v.replication_clusters.contains(clusterId)) {
+                                    continue;
+                                }
+                                return FutureUtil.failedFuture(new 
RestException(Response.Status.PRECONDITION_FAILED,
+                                    "The policies at the global topic level 
will only be copied to the clusters"
+                                    + " included in the namespace level 
replication. Therefore, please do not set the"
+                                    + " policies including other clusters"));
+                            }
+                            return CompletableFuture.completedFuture(null);
+                        });
+                    }
+                    return CompletableFuture.completedFuture(null);
+                })
                 .thenCompose(__ -> clustersAsync())
                 .thenCompose(clusters -> {
                     List<CompletableFuture<Void>> futures = new 
ArrayList<>(replicationClusters.size());
@@ -3405,9 +3446,10 @@ public class PersistentTopicsBase extends AdminResource {
                                     topicMetaOp.get().partitions).values());
                         });
                 }).thenCompose(__ ->
-                    getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op 
-> {
+                    getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal).thenCompose(op -> {
                             TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
                             topicPolicies.setReplicationClusters(clusterIds);
+                            topicPolicies.setIsGlobal(isGlobal);
                             return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies)
                                     .thenRun(() -> {
                                         log.info("[{}] Successfully set 
replication clusters for namespace={}, "
@@ -3421,12 +3463,16 @@ public class PersistentTopicsBase extends AdminResource 
{
                 ));
     }
 
-    protected CompletableFuture<Void> internalRemoveReplicationClusters() {
+    protected CompletableFuture<Void> 
internalRemoveReplicationClusters(boolean isGlobal) {
         return validatePoliciesReadOnlyAccessAsync()
-                .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+                .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal))
                 .thenCompose(op -> {
+                    if (op.isEmpty()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
                     TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
                     topicPolicies.setReplicationClusters(null);
+                    topicPolicies.setIsGlobal(isGlobal);
                     return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies)
                             .thenRun(() -> {
                                 log.info("[{}] Successfully set replication 
clusters for namespace={}, "
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index ac38bd67ee7..c539dad7426 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2304,6 +2304,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                               @PathParam("tenant") String tenant,
                               @PathParam("namespace") String namespace,
                               @PathParam("topic") @Encoded String encodedTopic,
+                              @QueryParam("isGlobal") @DefaultValue("false") 
boolean isGlobal,
                               @QueryParam("applied") @DefaultValue("false") 
boolean applied,
                               @ApiParam(value = "Whether leader broker 
redirected this call to this broker. "
                                       + "For internal use.")
@@ -2311,21 +2312,48 @@ public class PersistentTopics extends 
PersistentTopicsBase {
         validateTopicName(tenant, namespace, encodedTopic);
         validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, 
PolicyOperation.READ)
                 .thenCompose(__ -> preValidation(authoritative))
-                .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
-                .thenAccept(op -> {
-                    
asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(()
 -> {
-                        if (applied) {
-                            return 
getNamespacePolicies(namespaceName).replication_clusters;
+                .thenCompose(__ -> {
+                    if (applied) {
+                        return getAppliedReplicatedClusters();
+                    }
+                    return getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal).thenApply(policy -> {
+                        if (policy != null && policy.isPresent()
+                                && 
CollectionUtils.isNotEmpty(policy.get().getReplicationClustersSet())) {
+                            return policy.get().getReplicationClustersSet();
                         }
                         return null;
-                    }));
+                    });
                 })
+                .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
                     handleTopicPolicyException("getReplicationClusters", ex, 
asyncResponse);
                     return null;
                 });
     }
 
+    private CompletableFuture<Set<String>> getAppliedReplicatedClusters() {
+        return getTopicPoliciesAsyncWithRetry(topicName, false)
+            .thenCompose(localPolicy -> {
+                if (localPolicy != null && localPolicy.isPresent()
+                        && 
CollectionUtils.isNotEmpty(localPolicy.get().getReplicationClustersSet())) {
+                    return 
CompletableFuture.completedFuture(localPolicy.get().getReplicationClustersSet());
+                }
+                return getTopicPoliciesAsyncWithRetry(topicName, true)
+                    .thenCompose(globalPolicy -> {
+                        if (globalPolicy != null && globalPolicy.isPresent()
+                                && 
CollectionUtils.isNotEmpty(globalPolicy.get().getReplicationClustersSet())) {
+                            return 
CompletableFuture.completedFuture(globalPolicy.get().getReplicationClustersSet());
+                        }
+                        return 
getNamespacePoliciesAsync(namespaceName).thenApply(v -> {
+                            if (v != null) {
+                                return v.replication_clusters;
+                            }
+                            return null;
+                        });
+                    });
+            });
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/{topic}/replication")
     @ApiOperation(value = "Set the replication clusters for a topic.")
@@ -2341,32 +2369,14 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @Suspended final AsyncResponse asyncResponse,
             @PathParam("tenant") String tenant, @PathParam("namespace") String 
namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "List of replication clusters", required = true) 
List<String> clusterIds) {
         validateTopicName(tenant, namespace, encodedTopic);
         validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, 
PolicyOperation.WRITE)
-                .thenCompose(__ -> 
preValidation(authoritative)).thenCompose(__ -> {
-                    // Set a topic-level replicated clusters that do not 
contain local cluster is not meaningful, except
-                    // the following scenario: User has two clusters, which 
enabled Geo-Replication through a global
-                    // metadata store, the resources named partitioned topic 
metadata and the resource namespace-level
-                    // "replicated clusters" are shared between multi 
clusters. Pulsar can hardly delete a specify
-                    // partitioned topic. To support this use case, the 
following steps can implement it:
-                    // 1. set a global topic-level replicated clusters that do 
not contain local cluster.
-                    // 2. the local cluster will remove the subtopics 
automatically, and remove the schemas and local
-                    //    topic policies. Just leave the global topic policies 
there, which prevents the namespace level
-                    //    replicated clusters policy taking affect.
-                    // TODO But the API "pulsar-admin topics 
set-replication-clusters" does not support global policy,
-                    //   to support this scenario, a PIP is needed.
-                    boolean clustersDoesNotContainsLocal = 
CollectionUtils.isEmpty(clusterIds)
-                            || 
!clusterIds.contains(pulsar().getConfig().getClusterName());
-                    if (clustersDoesNotContainsLocal) {
-                        return FutureUtil.failedFuture(new 
RestException(Response.Status.PRECONDITION_FAILED,
-                            "Can not remove local cluster from the topic-level 
replication clusters policy"));
-                    }
-                    return CompletableFuture.completedFuture(null);
-                })
-                .thenCompose(__ -> internalSetReplicationClusters(clusterIds))
+                .thenCompose(__ -> preValidation(authoritative))
+                .thenCompose(__ -> internalSetReplicationClusters(clusterIds, 
isGlobal))
                 .thenRun(() -> 
asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
                     handleTopicPolicyException("setReplicationClusters", ex, 
asyncResponse);
@@ -2387,12 +2397,13 @@ public class PersistentTopics extends 
PersistentTopicsBase {
     public void removeReplicationClusters(@Suspended final AsyncResponse 
asyncResponse,
             @PathParam("tenant") String tenant, @PathParam("namespace") String 
namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, 
PolicyOperation.WRITE)
                 .thenCompose(__ -> preValidation(authoritative))
-                .thenCompose(__ -> internalRemoveReplicationClusters())
+                .thenCompose(__ -> internalRemoveReplicationClusters(isGlobal))
                 .thenRun(() -> 
asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
                     handleTopicPolicyException("removeReplicationClusters", 
ex, asyncResponse);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 4b833502095..4e858fc91a1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -139,6 +139,17 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     @Override
     public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName 
topicName) {
+        return deleteTopicPoliciesAsync(topicName, false);
+    }
+
+    /**
+     * @param keepGlobalPolicies only be used when a topic was deleted because 
users removes current
+     *    cluster from the policy "replicatedClusters".
+     *    See also https://github.com/apache/pulsar/blob/master/pip/pip-422.md
+     */
+    @Override
+    public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName 
topicName,
+                                                            boolean 
keepGlobalPolicies) {
         if 
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject()) || 
isSelf(topicName)) {
             return CompletableFuture.completedFuture(null);
         }
@@ -167,7 +178,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 log.info("Skip delete topic-level policies because {} has been 
removed before", changeEvents);
                 return CompletableFuture.completedFuture(null);
             }
-            return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
+            return sendTopicPolicyEvent(topicName, ActionType.DELETE, null,
+                    keepGlobalPolicies);
         });
     }
 
@@ -177,11 +189,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             return CompletableFuture.failedFuture(new 
BrokerServiceException.NotAllowedException(
                     "Not allowed to update topic policy for the heartbeat 
topic"));
         }
-        return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies);
+        return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies, 
false);
     }
 
     private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, 
ActionType actionType,
-                                                         @Nullable 
TopicPolicies policies) {
+         @Nullable TopicPolicies policies, boolean 
keepGlobalPoliciesAfterDeleting) {
         return pulsarService.getPulsarResources().getNamespaceResources()
                 .getPoliciesAsync(topicName.getNamespaceObject())
                 .thenCompose(namespacePolicies -> {
@@ -203,7 +215,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                     result.completeExceptionally(cause);
                                 } else {
                                     CompletableFuture<MessageId> writeFuture =
-                                            
sendTopicPolicyEventInternal(topicName, actionType, writer, policies);
+                                            
sendTopicPolicyEventInternal(topicName, actionType, writer, policies,
+                                                    
keepGlobalPoliciesAfterDeleting);
                                     writeFuture.whenComplete((messageId, e) -> 
{
                                         if (e != null) {
                                             result.completeExceptionally(e);
@@ -223,14 +236,20 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     private CompletableFuture<MessageId> 
sendTopicPolicyEventInternal(TopicName topicName, ActionType actionType,
-                                      SystemTopicClient.Writer<PulsarEvent> 
writer,
-                                      @Nullable TopicPolicies policies) {
+          SystemTopicClient.Writer<PulsarEvent> writer, @Nullable 
TopicPolicies policies,
+          boolean keepGlobalPoliciesAfterDeleting) {
         PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
         if (!ActionType.DELETE.equals(actionType)) {
             return writer.writeAsync(getEventKey(event, policies != null && 
policies.isGlobalPolicies()), event);
         }
         // When a topic is deleting, delete both non-global and global 
topic-level policies.
-        CompletableFuture<MessageId> deletePolicies = 
writer.deleteAsync(getEventKey(event, true), event)
+        CompletableFuture<MessageId> dealWithGlobalPolicy;
+        if (keepGlobalPoliciesAfterDeleting) {
+            dealWithGlobalPolicy = CompletableFuture.completedFuture(null);
+        } else {
+            dealWithGlobalPolicy = writer.deleteAsync(getEventKey(event, 
true), event);
+        }
+        CompletableFuture<MessageId> deletePolicies = dealWithGlobalPolicy
             .thenCompose(__ -> {
                 return writer.deleteAsync(getEventKey(event, false), event);
             });
@@ -620,7 +639,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     SystemTopicClient<PulsarEvent> systemTopicClient = 
getNamespaceEventsSystemTopicFactory()
                             
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
                     systemTopicClient.newWriterAsync().thenAccept(writer -> {
-                        sendTopicPolicyEventInternal(topicName, 
ActionType.DELETE, writer, event.getPolicies())
+                        sendTopicPolicyEventInternal(topicName, 
ActionType.DELETE, writer, event.getPolicies(), false)
                             .whenComplete((result, e) -> writer.closeAsync()
                             .whenComplete((res, ex) -> {
                                 if (ex != null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index ec5da7995bf..02303fd9c4a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -28,6 +28,8 @@ import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Topic policies service.
@@ -38,6 +40,8 @@ public interface TopicPoliciesService extends AutoCloseable {
 
     String GLOBAL_POLICIES_MSG_KEY_PREFIX = "__G__";
 
+    Logger LOG = LoggerFactory.getLogger(TopicPoliciesService.class);
+
     TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
 
     /**
@@ -47,6 +51,11 @@ public interface TopicPoliciesService extends AutoCloseable {
      */
     CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName);
 
+    default CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName 
topicName,
+                                                             boolean 
keepGlobalPoliciesAfterDeleting) {
+        return deleteTopicPoliciesAsync(topicName);
+    }
+
     /**
      * Update policies for a topic asynchronously.
      *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5602cd67951..d8cbc0cb453 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1929,7 +1929,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             return CompletableFuture.completedFuture(null);
         }
 
-        String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
+        final String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
 
         return checkAllowedCluster(localCluster).thenCompose(success -> {
             if (!success) {
@@ -1980,7 +1980,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         });
     }
 
+    /**
+     * There are only one cases that will remove local clusters: using global 
metadata store, which means that
+     * namespaces will share policies cross multi clusters, including 
"replicated clusters" and "partitioned topic
+     * metadata", we can hardly delete partitioned topic from one cluster and 
keep it exists in another.
+     * Users removes local cluster "replicated clusters" to delete topic from 
one of clusters.
+     */
     CompletableFuture<Void> deleteSchemaAndPoliciesIfClusterRemoved() {
+        final String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
         TopicName tName = TopicName.get(topic);
         if (!tName.isPartitioned()) {
             return CompletableFuture.completedFuture(null);
@@ -2018,15 +2025,36 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                             }
 
                                     });
-                                    // There are only one cases that will 
remove local clusters: using global metadata
-                                    // store, namespaces will share policies 
cross multi clusters, including
-                                    // "replicated clusters" and "partitioned 
topic metadata", we can hardly delete
-                                    // partitioned topic from one cluster and 
keep it exists in another. Removing
-                                    // local cluster from the namespace level 
"replicated clusters" can do this.
-                                    // TODO: there is no way to delete a 
specify partitioned topic if users have enabled
-                                    //  Geo-Replication with a global metadata 
store, a PIP is needed.
-                                    // Since the system topic 
"__change_events" under the namespace will also be
-                                    // deleted, we can skip to delete 
topic-level policies.
+
+                                    // Two cases that can run up to here:
+                                    // 1. Namespace level removing local 
cluster: all topic policies will be removed
+                                    // when the system topic "__change_events" 
is deleting.
+                                    // 2. Global topic level removing local 
cluster: we need to remove local topic-level
+                                    // policies here, but leave global 
topic-level policies there to avoid the namespace
+                                    // level policies take effect, whose 
policies still contains local cluster.
+                                    boolean changeEventsAlsoBeingDeleted = 
!topicPolicies.getReplicationClusters()
+                                            
.getNamespaceValue().contains(localCluster);
+                                    if (changeEventsAlsoBeingDeleted) {
+                                        log.info("Skip to deleted topic 
policies[{}] after all partitions[{}] were"
+                                                + " removed because the system 
topic __change_events will be removed.",
+                                                partitionedName, 
metadataOp.get().partitions);
+                                        return;
+                                    }
+                                    
brokerService.getPulsar().getTopicPoliciesService()
+                                        
.deleteTopicPoliciesAsync(partitionedName, true)
+                                            .whenComplete((__, ex) -> {
+                                            if (ex == null) {
+                                                log.info("Deleted topic 
policies[{}] after all partitions[{}] were"
+                                                    + " removed because the 
current cluster has bee removed from"
+                                                    + " topic policies. Global 
policies will not be deleted.",
+                                                    partitionedName, 
metadataOp.get().partitions);
+                                            } else {
+                                                log.error("Failed to delete 
topic policies[{}] after all partitions[{}]"
+                                                    + " were removed,  when 
the current cluster has bee removed from"
+                                                    + " topic policies",
+                                                    partitionedName, 
metadataOp.get().partitions, ex);
+                                            }
+                                    });
                                 }
                             }
                         });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 08c86f62de3..1d911c52c83 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
@@ -4070,19 +4071,51 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(dataProvider = "topicTypes")
     public void testRemoveLocalCluster(TopicType topicType) throws Exception {
-        String topic = "persistent://" + myNamespace + 
"/testSetSubRateWithSub";
+        final String cluster = pulsar.getConfig().getClusterName();
+        final String topic = "persistent://" + myNamespace + 
"/testSetSubRateWithSub";
         if (TopicType.PARTITIONED.equals(topicType)) {
             admin.topics().createNonPartitionedTopic(topic);
         } else {
             admin.topics().createPartitionedTopic(topic, 2);
         }
+        // Can not remove current cluster by local topic-level policies.
         try {
             admin.topics().setReplicationClusters(topic, 
Arrays.asList("not-local-cluster"));
             fail("Can not remove local cluster from the topic-level 
replication clusters policy");
         } catch (PulsarAdminException.PreconditionFailedException e) {
-            assertTrue(e.getMessage().contains("Can not remove local cluster 
from the topic-level replication clusters"
-                + " policy"));
+            assertTrue(e.getMessage().contains("Can not remove local cluster 
from the local topic-level replication"
+                + " clusters policy"));
         }
+        try {
+            admin.topicPolicies(false).setReplicationClusters(topic, 
Arrays.asList("not-local-cluster")).get();
+            fail("Can not remove local cluster from the topic-level 
replication clusters policy");
+        } catch (Exception e) {
+            Throwable actEx = FutureUtil.unwrapCompletionException(e);
+            assertTrue(actEx.getMessage().contains("Can not remove local 
cluster from the local topic-level replication"
+                    + " clusters policy"));
+        }
+        // Can not use the global topic level policy when namespace-level 
replication is not enabled.
+        try {
+            admin.topicPolicies(true).setReplicationClusters(topic, 
Arrays.asList("not-local-cluster")).get();
+            fail("Please do not use the global topic level policy when 
namespace-level replication is not enabled,"
+                + " because the global level policy relies on namespace-level 
replication");
+        } catch (Exception e) {
+            Throwable actEx = FutureUtil.unwrapCompletionException(e);
+            assertTrue(actEx.getMessage().contains("namespace-level 
replication"));
+        }
+        // Can not set global topic level clusters that does not exist in 
namespace-level replication.
+        admin.namespaces().setNamespaceReplicationClusters(myNamespace, 
Collections.singleton(cluster));
+        try {
+            admin.topicPolicies(true)
+                    .setReplicationClusters(topic, 
Arrays.asList("not-local-cluster", cluster)).get();
+            fail("The policies at the global topic level will only be copied 
to the clusters included in the namespace"
+                    + " level replication. Therefore, please do not set the 
policies at the global topic level to"
+                    + " other clusters");
+        } catch (Exception e) {
+            Throwable actEx = FutureUtil.unwrapCompletionException(e);
+            assertTrue(actEx.getMessage().contains("namespace-level 
replication"));
+        }
+
         // cleanup.
         if (TopicType.PARTITIONED.equals(topicType)) {
             admin.topics().delete(topic, false);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index b427adbc550..de6c5f9f356 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -70,6 +70,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
@@ -138,34 +139,6 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         super.cleanup();
     }
 
-    private void waitReplicatorStopped(String topicName) {
-        Awaitility.await().untilAsserted(() -> {
-            Optional<Topic> topicOptional2 = 
pulsar2.getBrokerService().getTopic(topicName, false).get();
-            assertTrue(topicOptional2.isPresent());
-            PersistentTopic persistentTopic2 = (PersistentTopic) 
topicOptional2.get();
-            assertTrue(persistentTopic2.getProducers().isEmpty());
-            Optional<Topic> topicOptional1 = 
pulsar2.getBrokerService().getTopic(topicName, false).get();
-            assertTrue(topicOptional1.isPresent());
-            PersistentTopic persistentTopic1 = (PersistentTopic) 
topicOptional2.get();
-            assertTrue(persistentTopic1.getReplicators().isEmpty()
-                    || 
!persistentTopic1.getReplicators().get(cluster2).isConnected());
-        });
-    }
-
-    /**
-     * Override "AbstractReplicator.producer" by {@param producer} and return 
the original value.
-     */
-    private ProducerImpl overrideProducerForReplicator(AbstractReplicator 
replicator, ProducerImpl newProducer)
-            throws Exception {
-        Field producerField = 
AbstractReplicator.class.getDeclaredField("producer");
-        producerField.setAccessible(true);
-        ProducerImpl originalValue = (ProducerImpl) 
producerField.get(replicator);
-        synchronized (replicator) {
-            producerField.set(replicator, newProducer);
-        }
-        return originalValue;
-    }
-
     @Test(timeOut = 45 * 1000)
     public void testReplicatorProducerStatInTopic() throws Exception {
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
@@ -224,6 +197,198 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
     }
 
+    @Test
+    public void testDeleteRemoteTopicByGlobalPolicy() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_123");
+        final String subTopic = 
TopicName.get(topicName).getPartition(0).toString();
+        admin1.topics().createPartitionedTopic(topicName, 1);
+        Producer<byte[]> producer1 = 
client1.newProducer().topic(topicName).create();
+        producer1.close();
+        waitReplicatorStarted(subTopic, pulsar2);
+        Set<String> clustersApplied = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+        assertTrue(clustersApplied.contains(cluster1));
+        assertTrue(clustersApplied.contains(cluster2));
+
+        // Remove topic from a cluster.
+        admin1.topicPolicies(true).setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> clustersApplied1 = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied1.contains(cluster1));
+            assertFalse(clustersApplied1.contains(cluster2));
+            Set<String> clustersApplied2 = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied2.contains(cluster1));
+            assertFalse(clustersApplied2.contains(cluster2));
+
+            Set<String> local1 = 
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(local1));
+            Set<String> local2 = 
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(local2));
+
+            Set<String> global1 = 
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertNotNull(global1);
+            assertTrue(global1.contains(cluster1));
+            assertFalse(global1.contains(cluster2));
+
+            Set<String> global2 = 
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertNotNull(global2);
+            assertTrue(global2.contains(cluster1));
+            assertFalse(global2.contains(cluster2));
+        });
+        waitReplicatorStopped(subTopic, true);
+
+        // Remove global policy.
+        admin1.topicPolicies(true).removeReplicationClusters(topicName);
+        Producer<byte[]> producer2 = 
client1.newProducer().topic(topicName).create();
+        producer2.close();
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> clustersApplied1 = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied1.contains(cluster1));
+            assertTrue(clustersApplied1.contains(cluster2));
+            Set<String> clustersApplied2 = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertFalse(clustersApplied2.contains(cluster1));
+            assertTrue(clustersApplied2.contains(cluster2));
+
+            Set<String> clusters1 = 
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(clusters1));
+            Set<String> clusters2 = 
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(clusters2));
+        });
+        waitReplicatorStarted(subTopic, pulsar2);
+
+        admin1.topics().unload(subTopic);
+        admin2.topics().unload(subTopic);
+    }
+
+    /**
+     * Test: policies overwrite and applied policies.
+     */
+    @Test
+    public void testPoliciesOverWrite() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_123");
+        final String subTopic = 
TopicName.get(topicName).getPartition(0).toString();
+        admin1.topics().createPartitionedTopic(topicName, 1);
+        Producer<byte[]> producer1 = 
client1.newProducer().topic(topicName).create();
+        producer1.close();
+        waitReplicatorStarted(subTopic, pulsar2);
+        Set<String> clustersApplied1 = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+        assertTrue(clustersApplied1.contains(cluster1));
+        assertTrue(clustersApplied1.contains(cluster2));
+        // Set clusters for cluster2 to avoid topic deleting. This feature is 
needed for the following situation,
+        // - There are 3 clusters using shared metadata store
+        // - The user want to delete topic on the cluster "c2", and to stop 
replication on the cluster "c3 -> c1"
+        // - The user will do the following configurations
+        //    - Set a global policy: [c1, c3].
+        //    - Set a local policy for the cluster "c3": [c3].
+        Awaitility.await().untilAsserted(() -> {
+            admin2.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster2));
+            Set<String> clustersApplied2 = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertFalse(clustersApplied2.contains(cluster1));
+            assertTrue(clustersApplied2.contains(cluster2));
+        });
+
+        // Cluster1: Global policy overwrite namespace policy.
+        // Cluster2: Global policy never overwrite namespace policy.
+        admin1.topicPolicies(true).setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> clustersApplied10 = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied10.contains(cluster1));
+            assertFalse(clustersApplied10.contains(cluster2));
+            Set<String> clustersApplied20 = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertFalse(clustersApplied20.contains(cluster1));
+            assertTrue(clustersApplied20.contains(cluster2));
+
+            Set<String> local1 = 
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(local1));
+            Set<String> local2 = 
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isNotEmpty(local2));
+            assertTrue(local2.contains(cluster2));
+
+            Set<String> global1 = 
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertNotNull(global1);
+            assertTrue(global1.contains(cluster1));
+            assertFalse(global1.contains(cluster2));
+            Set<String> global2 = 
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertNotNull(global2);
+            assertTrue(global2.contains(cluster1));
+            assertFalse(global2.contains(cluster2));
+        });
+        waitReplicatorStopped(subTopic, false);
+
+        // Remove global policy.
+        admin1.topicPolicies(true).removeReplicationClusters(topicName);
+        Producer<byte[]> producer2 = 
client1.newProducer().topic(topicName).create();
+        producer2.close();
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> clustersApplied10 = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied10.contains(cluster1));
+            assertTrue(clustersApplied10.contains(cluster2));
+            Set<String> clustersApplied20 = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertFalse(clustersApplied20.contains(cluster1));
+            assertTrue(clustersApplied20.contains(cluster2));
+
+            Set<String> local2 = 
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isNotEmpty(local2));
+            assertTrue(local2.contains(cluster2));
+
+            Set<String> global1 = 
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(global1));
+            Set<String> global2 = 
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(global2));
+        });
+        waitReplicatorStarted(subTopic, pulsar2);
+
+        // Cluster1: Local policy overwrite namespace policy.
+        // Cluster2: Global policy never overwrite namespace policy.
+        admin1.topicPolicies(false).setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        Producer<byte[]> producer3 = 
client1.newProducer().topic(topicName).create();
+        producer3.close();
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> clustersApplied10 = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied10.contains(cluster1));
+            assertFalse(clustersApplied10.contains(cluster2));
+            Set<String> clustersApplied20 = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertFalse(clustersApplied20.contains(cluster1));
+            assertTrue(clustersApplied20.contains(cluster2));
+
+            Set<String> global1 = 
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(global1));
+            Set<String> global2 = 
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(global2));
+
+            Set<String> local1 = 
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isNotEmpty(local1));
+            assertTrue(local1.contains(cluster1));
+            assertFalse(local1.contains(cluster2));
+
+            Set<String> local2 = 
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isNotEmpty(local2));
+            assertTrue(local2.contains(cluster2));
+            assertFalse(local2.contains(cluster1));
+        });
+        waitReplicatorStopped(subTopic, false);
+
+        // Remove local policy.
+        admin1.topicPolicies(false).removeReplicationClusters(topicName);
+        Producer<byte[]> producer4 = 
client1.newProducer().topic(topicName).create();
+        producer4.close();
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> local1 = 
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(local1));
+            Set<String> local2 = 
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isNotEmpty(local2));
+            assertTrue(local2.contains(cluster2));
+
+            Set<String> global1 = 
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(global1));
+            Set<String> global2 = 
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(global2));
+        });
+        waitReplicatorStarted(subTopic, pulsar2);
+
+        admin1.topics().unload(subTopic);
+        admin2.topics().unload(subTopic);
+    }
+
     @Test(timeOut = 45 * 1000)
     public void testCreateRemoteConsumerFirst() throws Exception {
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
@@ -549,8 +714,8 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         assertEquals(topicMetadata2.partitions, 2);
         // cleanup.
         admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
-        waitReplicatorStopped(partition0);
-        waitReplicatorStopped(partition1);
+        waitReplicatorStopped(partition0, false);
+        waitReplicatorStopped(partition1, false);
         admin1.topics().deletePartitionedTopic(topicName);
         admin2.topics().deletePartitionedTopic(topicName);
     }
@@ -600,8 +765,8 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
         // cleanup.
         admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
-        waitReplicatorStopped(partition0);
-        waitReplicatorStopped(partition1);
+        waitReplicatorStopped(partition0, false);
+        waitReplicatorStopped(partition1, false);
         admin1.topics().deletePartitionedTopic(topicName);
         admin2.topics().deletePartitionedTopic(topicName);
     }
@@ -1325,7 +1490,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         // cleanup.
         taskToClearInjection.run();
         admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
-        waitReplicatorStopped(topicName);
+        waitReplicatorStopped(topicName, false);
         admin1.topics().delete(topicName, false);
         admin2.topics().delete(topicName, false);
     }
@@ -1637,7 +1802,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         // cleanup.
         producer1.close();
         admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
-        waitReplicatorStopped(topicName);
+        waitReplicatorStopped(topicName, false);
         admin1.topics().delete(topicName, false);
         if (originalReplClient2 == null) {
             
pulsar1.getBrokerService().getReplicationClients().remove(cluster2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 82c6267c153..58d8bcf7045 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
 import java.net.URL;
 import java.time.Duration;
 import java.util.Arrays;
@@ -52,6 +53,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -371,14 +373,59 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
     }
 
     protected void waitReplicatorStarted(String topicName) {
+        waitReplicatorStarted(topicName, pulsar2);
+    }
+
+    protected void waitReplicatorStarted(String topicName, PulsarService 
remoteCluster) {
         Awaitility.await().untilAsserted(() -> {
-            Optional<Topic> topicOptional2 = 
pulsar2.getBrokerService().getTopic(topicName, false).get();
+            Optional<Topic> topicOptional2 = 
remoteCluster.getBrokerService().getTopic(topicName, false).get();
             assertTrue(topicOptional2.isPresent());
             PersistentTopic persistentTopic2 = (PersistentTopic) 
topicOptional2.get();
             assertFalse(persistentTopic2.getProducers().isEmpty());
         });
     }
 
+    protected void waitReplicatorStopped(String topicName, boolean 
remoteTopicExpectedDeleted) {
+        waitReplicatorStopped(topicName, pulsar1, pulsar2, 
remoteTopicExpectedDeleted);
+    }
+
+    protected void waitReplicatorStopped(String topicName, PulsarService 
sourceCluster, PulsarService remoteCluster,
+                                         boolean remoteTopicExpectedDeleted) {
+        Awaitility.await().untilAsserted(() -> {
+            Optional<Topic> remoteTp = 
remoteCluster.getBrokerService().getTopic(topicName, false).get();
+            if (remoteTopicExpectedDeleted) {
+                assertTrue(remoteTp.isEmpty());
+            } else {
+                assertTrue(remoteTp.isPresent());
+            }
+            if (remoteTp.isPresent()) {
+                PersistentTopic remoteTp1 = (PersistentTopic) remoteTp.get();
+                for (org.apache.pulsar.broker.service.Producer p : 
remoteTp1.getProducers().values()) {
+                    
assertFalse(p.getProducerName().startsWith(remoteCluster.getConfig().getReplicatorPrefix()));
+                }
+            }
+            Optional<Topic> sourceTp = 
sourceCluster.getBrokerService().getTopic(topicName, false).get();
+            assertTrue(sourceTp.isPresent());
+            PersistentTopic sourceTp1 = (PersistentTopic) sourceTp.get();
+            assertTrue(sourceTp1.getReplicators().isEmpty()
+                    || 
!sourceTp1.getReplicators().get(remoteCluster.getConfig().getClusterName()).isConnected());
+        });
+    }
+
+    /**
+     * Override "AbstractReplicator.producer" by {@param producer} and return 
the original value.
+     */
+    protected ProducerImpl overrideProducerForReplicator(AbstractReplicator 
replicator, ProducerImpl newProducer)
+            throws Exception {
+        Field producerField = 
AbstractReplicator.class.getDeclaredField("producer");
+        producerField.setAccessible(true);
+        ProducerImpl originalValue = (ProducerImpl) 
producerField.get(replicator);
+        synchronized (replicator) {
+            producerField.set(replicator, newProducer);
+        }
+        return originalValue;
+    }
+
     protected PulsarClient initClient(ClientBuilder clientBuilder) throws 
Exception {
         return clientBuilder.build();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index 2030a22d7e0..a0099f97fea 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.zookeeper.ZookeeperServerTest;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -78,6 +79,18 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
         super.testReplicatorProducerStatInTopic();
     }
 
+    @Override
+    @Test(enabled = false)
+    public void testDeleteRemoteTopicByGlobalPolicy() throws Exception {
+        super.testDeleteRemoteTopicByGlobalPolicy();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testPoliciesOverWrite() throws Exception {
+        super.testPoliciesOverWrite();
+    }
+
     @Override
     @Test(enabled = false)
     public void testCreateRemoteConsumerFirst() throws Exception {
@@ -180,8 +193,16 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
         super.testNonPersistentReplicatorQueueSize();
     }
 
-    @Test(timeOut = 60_000)
-    public void testRemoveCluster() throws Exception {
+    @DataProvider
+    public Object[][] removeClusterLevels() {
+        return new Object[][] {
+            {"namespace"},
+            {"topic"}
+        };
+    }
+
+    @Test(timeOut = 60_000, dataProvider = "removeClusterLevels")
+    public void testRemoveCluster(String removeClusterLevel) throws Exception {
         // Initialize.
         final String ns1 = defaultTenant + "/" + 
"ns_73b1a31afce34671a5ddc48fe5ad7fc8";
         final String topic = "persistent://" + ns1 + 
"/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a";
@@ -235,15 +256,27 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
 
         // The topics under the namespace of the cluster-1 will be deleted.
         // Verify the result.
-        admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster2)));
+        if ("namespace".equals(removeClusterLevel)) {
+            admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster2)));
+        } else {
+            admin1.topicPolicies(true).setReplicationClusters(topic, 
Arrays.asList(cluster2));
+            admin2.topicPolicies(true).setReplicationClusters(topic, 
Arrays.asList(cluster2));
+        }
         
Awaitility.await().atMost(Duration.ofSeconds(60)).ignoreExceptions().untilAsserted(()
 -> {
             Map<String, CompletableFuture<Optional<Topic>>> tps = 
pulsar1.getBrokerService().getTopics();
             assertFalse(tps.containsKey(topicP0));
             assertFalse(tps.containsKey(topicP1));
-            assertFalse(tps.containsKey(topicChangeEvents));
-            assertFalse(pulsar1.getNamespaceService()
-                    .checkTopicExistsAsync(TopicName.get(topicChangeEvents))
-                    .get(5, TimeUnit.SECONDS).isExists());
+            if ("namespace".equals(removeClusterLevel)) {
+                assertFalse(tps.containsKey(topicChangeEvents));
+                assertFalse(pulsar1.getNamespaceService()
+                        
.checkTopicExistsAsync(TopicName.get(topicChangeEvents))
+                        .get(5, TimeUnit.SECONDS).isExists());
+            } else {
+                assertTrue(tps.containsKey(topicChangeEvents));
+                assertTrue(pulsar1.getNamespaceService()
+                        
.checkTopicExistsAsync(TopicName.get(topicChangeEvents))
+                        .get(5, TimeUnit.SECONDS).isExists());
+            }
             // Verify: schema will be removed in local cluster, and remote 
cluster will not.
             List<CompletableFuture<StoredSchema>> schemaList13 =
                     
pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
@@ -252,6 +285,16 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
                     
pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
             assertEquals(schemaList23.size(), 1);
             // Verify: the topic policies will be removed in local cluster, 
but remote cluster will not.
+            if ("topic".equals(removeClusterLevel)) {
+                Optional<TopicPolicies> localPolicies1 = 
pulsar1.getTopicPoliciesService()
+                        .getTopicPoliciesAsync(TopicName.get(topic), 
LOCAL_ONLY).join();
+                assertTrue(localPolicies1.isEmpty(), "Local cluster should 
have deleted local policies.");
+                Optional<TopicPolicies> globalPolicies1 = 
pulsar1.getTopicPoliciesService()
+                        .getTopicPoliciesAsync(TopicName.get(topic), 
GLOBAL_ONLY).join();
+                assertTrue(globalPolicies1.isPresent(), "Local cluster should 
have global policies.");
+                assertEquals(globalPolicies1.get().getPublishRate(), 
publishRateAddGlobal,
+                        "Remote cluster should have global policies: publish 
rate.");
+            }
             Optional<TopicPolicies> globalPolicies2 = 
pulsar2.getTopicPoliciesService()
                     .getTopicPoliciesAsync(TopicName.get(topic), 
GLOBAL_ONLY).join();
             assertTrue(globalPolicies2.isPresent(), "Remote cluster should 
have global policies.");
@@ -265,7 +308,11 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
         });
 
         // cleanup.
+        if ("topic".equals(removeClusterLevel)) {
+            admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster2)));
+        }
         admin2.topics().deletePartitionedTopic(topic);
+        assertEquals(admin2.topics().getList(ns1).size(), 0);
         admin2.namespaces().deleteNamespace(ns1);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 49676dc8a08..e14cc5045d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -27,10 +27,12 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -65,6 +67,211 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
         super.testReplicatorProducerStatInTopic();
     }
 
+    @Override
+    @Test
+    public void testDeleteRemoteTopicByGlobalPolicy() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_123");
+        final String subTopic = 
TopicName.get(topicName).getPartition(0).toString();
+        admin1.topics().createPartitionedTopic(topicName, 1);
+        Producer<byte[]> producer1 = 
client1.newProducer().topic(topicName).create();
+        producer1.close();
+        waitReplicatorStarted(subTopic, pulsar2);
+        waitReplicatorStarted(subTopic, pulsar1);
+        Set<String> clustersApplied1 = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+        assertTrue(clustersApplied1.contains(cluster1));
+        assertTrue(clustersApplied1.contains(cluster2));
+        Set<String> clustersApplied2 = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+        assertTrue(clustersApplied2.contains(cluster1));
+        assertTrue(clustersApplied2.contains(cluster2));
+
+        // Remove topic from a cluster.
+        admin1.topicPolicies(true).setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> clustersApplied1a = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied1a.contains(cluster1));
+            assertFalse(clustersApplied1a.contains(cluster2));
+            Set<String> clustersApplied2a = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied2a.contains(cluster1));
+            assertFalse(clustersApplied2a.contains(cluster2));
+
+            Set<String> local1 = 
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(local1));
+            Set<String> local2 = 
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(local2));
+
+            Set<String> global1 = 
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertNotNull(global1);
+            assertTrue(global1.contains(cluster1));
+            assertFalse(global1.contains(cluster2));
+
+            Set<String> global2 = 
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertNotNull(global2);
+            assertTrue(global2.contains(cluster1));
+            assertFalse(global2.contains(cluster2));
+        });
+        waitReplicatorStopped(subTopic, pulsar1, pulsar2, true);
+
+        // Remove global policy.
+        admin1.topicPolicies(true).removeReplicationClusters(topicName);
+        Producer<byte[]> producer2 = 
client1.newProducer().topic(topicName).create();
+        producer2.close();
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> clustersApplied1a = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied1a.contains(cluster1));
+            assertTrue(clustersApplied1a.contains(cluster2));
+            Set<String> clustersApplied2a = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied2a.contains(cluster1));
+            assertTrue(clustersApplied2a.contains(cluster2));
+
+            Set<String> clusters1 = 
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(clusters1));
+            Set<String> clusters2 = 
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(clusters2));
+        });
+        waitReplicatorStarted(subTopic, pulsar2);
+        waitReplicatorStarted(subTopic, pulsar1);
+
+        admin1.topics().unload(subTopic);
+        admin2.topics().unload(subTopic);
+    }
+
+    @Override
+    @Test
+    public void testPoliciesOverWrite() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_123");
+        final String subTopic = 
TopicName.get(topicName).getPartition(0).toString();
+        admin1.topics().createPartitionedTopic(topicName, 1);
+        Producer<byte[]> producer1 = 
client1.newProducer().topic(topicName).create();
+        producer1.close();
+        waitReplicatorStarted(subTopic, pulsar2);
+        Set<String> clustersApplied1 = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+        assertTrue(clustersApplied1.contains(cluster1));
+        assertTrue(clustersApplied1.contains(cluster2));
+        // Set clusters for cluster2 to avoid topic deleting. This feature is 
needed for the following situation,
+        // - There are 3 clusters using shared metadata store
+        // - The user want to delete topic on the cluster "c2", and to stop 
replication on the cluster "c3 -> c1"
+        // - The user will do the following configurations
+        //    - Set a global policy: [c1, c3].
+        //    - Set a local policy for the cluster "c3": [c3].
+        admin2.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster2));
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> clustersApplied2 = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertFalse(clustersApplied2.contains(cluster1));
+            assertTrue(clustersApplied2.contains(cluster2));
+        });
+
+
+        // Cluster1: Global policy overwrite namespace policy.
+        // Cluster2: Global policy never overwrite namespace policy.
+        admin1.topicPolicies(true).setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> clustersApplied1a = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied1a.contains(cluster1));
+            assertFalse(clustersApplied1a.contains(cluster2));
+            Set<String> clustersApplied2a = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertFalse(clustersApplied2a.contains(cluster1));
+            assertTrue(clustersApplied2a.contains(cluster2));
+
+            Set<String> local1 = 
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(local1));
+            Set<String> local2 = 
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isNotEmpty(local2));
+            assertTrue(local2.contains(cluster2));
+
+            Set<String> global1 = 
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertNotNull(global1);
+            assertTrue(global1.contains(cluster1));
+            assertFalse(global1.contains(cluster2));
+            Set<String> global2 = 
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertNotNull(global2);
+            assertTrue(global2.contains(cluster1));
+            assertFalse(global2.contains(cluster2));
+        });
+        waitReplicatorStopped(subTopic, pulsar1, pulsar2, false);
+        waitReplicatorStopped(subTopic, pulsar2, pulsar1, false);
+
+        // Remove global policy.
+        admin1.topicPolicies(true).removeReplicationClusters(topicName);
+        Producer<byte[]> producer2 = 
client1.newProducer().topic(topicName).create();
+        producer2.close();
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> clustersApplied1a = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied1a.contains(cluster1));
+            assertTrue(clustersApplied1a.contains(cluster2));
+            Set<String> clustersApplied2a = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertFalse(clustersApplied2a.contains(cluster1));
+            assertTrue(clustersApplied2a.contains(cluster2));
+
+            Set<String> local2 = 
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isNotEmpty(local2));
+            assertTrue(local2.contains(cluster2));
+
+            Set<String> global1 = 
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(global1));
+            Set<String> global2 = 
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(global2));
+        });
+        waitReplicatorStarted(subTopic, pulsar2);
+
+        // Cluster1: Local policy overwrite namespace policy.
+        // Cluster2: Global policy never overwrite namespace policy.
+        admin1.topicPolicies(false).setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        Producer<byte[]> producer3 = 
client1.newProducer().topic(topicName).create();
+        producer3.close();
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> clustersApplied1a = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied1a.contains(cluster1));
+            assertFalse(clustersApplied1a.contains(cluster2));
+            Set<String> clustersApplied2a = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertFalse(clustersApplied2a.contains(cluster1));
+            assertTrue(clustersApplied2a.contains(cluster2));
+
+            Set<String> global1 = 
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(global1));
+            Set<String> global2 = 
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(global2));
+
+            Set<String> local1 = 
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isNotEmpty(local1));
+            assertTrue(local1.contains(cluster1));
+            assertFalse(local1.contains(cluster2));
+
+            Set<String> local2 = 
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isNotEmpty(local2));
+            assertTrue(local2.contains(cluster2));
+            assertFalse(local2.contains(cluster1));
+        });
+        waitReplicatorStopped(subTopic, false);
+
+        // Remove local policy.
+        admin1.topicPolicies(false).removeReplicationClusters(topicName);
+        Producer<byte[]> producer4 = 
client1.newProducer().topic(topicName).create();
+        producer4.close();
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> clustersApplied1a = 
admin1.topicPolicies().getReplicationClusters(topicName, true);
+            assertTrue(clustersApplied1a.contains(cluster1));
+            assertTrue(clustersApplied1a.contains(cluster2));
+            Set<String> clustersApplied2a = 
admin2.topicPolicies().getReplicationClusters(topicName, true);
+            assertFalse(clustersApplied2a.contains(cluster1));
+            assertTrue(clustersApplied2a.contains(cluster2));
+
+            Set<String> local1 = 
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(local1));
+            Set<String> local2 = 
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isNotEmpty(local2));
+            assertTrue(local2.contains(cluster2));
+
+            Set<String> global1 = 
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(global1));
+            Set<String> global2 = 
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+            assertTrue(CollectionUtils.isEmpty(global2));
+        });
+        waitReplicatorStarted(subTopic, pulsar2);
+
+        admin1.topics().unload(subTopic);
+        admin2.topics().unload(subTopic);
+    }
+
     @Test(enabled = false)
     public void testCreateRemoteConsumerFirst() throws Exception {
         super.testReplicatorProducerStatInTopic();
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
index 4238842bcfa..7a5623f849f 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.admin;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -1929,4 +1930,13 @@ public interface TopicPolicies {
      * Get the dispatcherPauseOnAckStatePersistentEnabled policy for a given 
topic asynchronously.
      */
     CompletableFuture<Boolean> getDispatcherPauseOnAckStatePersistent(String 
topic, boolean applied);
+
+    /**
+     * Set the replication clusters for the topic.
+     */
+    CompletableFuture<Void> setReplicationClusters(String topic, List<String> 
clusterIds);
+
+    Set<String> getReplicationClusters(String topic, boolean applied) throws 
PulsarAdminException;
+
+    void removeReplicationClusters(String topic) throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index f58fd865428..0a4a816640f 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -1281,6 +1282,36 @@ public class TopicPoliciesImpl extends BaseResource 
implements TopicPolicies {
         return asyncGetRequest(path, new FutureCallback<Boolean>(){});
     }
 
+    @Override
+    public CompletableFuture<Void> setReplicationClusters(String topic, 
List<String> clusterIds) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "replication");
+        return asyncPostRequest(path, Entity.entity(clusterIds, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public Set<String> getReplicationClusters(String topic, boolean applied) 
throws PulsarAdminException {
+        return sync(() -> getReplicationClustersAsync(topic, applied));
+    }
+
+    public CompletableFuture<Set<String>> getReplicationClustersAsync(String 
topic, boolean applied) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "replication");
+        path = path.queryParam("applied", applied);
+        return asyncGetRequest(path, new FutureCallback<Set<String>>(){});
+    }
+
+    @Override
+    public void removeReplicationClusters(String topic) throws 
PulsarAdminException {
+        sync(() -> removeReplicationClustersAsync(topic));
+    }
+
+    public CompletableFuture<Void> removeReplicationClustersAsync(String 
topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "replication");
+        return asyncDeleteRequest(path);
+    }
+
     /*
      * returns topic name with encoded Local Name
      */
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 7b7883af9ac..b49c4d40a53 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.admin.cli;
 import static org.apache.pulsar.admin.cli.utils.CmdUtils.maxValueCheck;
 import static org.apache.pulsar.admin.cli.utils.CmdUtils.positiveCheck;
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -168,6 +169,10 @@ public class CmdTopicPolicies extends CmdBase {
                 new GetDispatcherPauseOnAckStatePersistent());
         addCommand("remove-dispatcher-pause-on-ack-state-persistent",
                 new RemoveDispatcherPauseOnAckStatePersistent());
+
+        addCommand("get-replication-clusters", new GetReplicationClusters());
+        addCommand("set-replication-clusters", new SetReplicationClusters());
+        addCommand("remove-replication-clusters", new 
RemoveReplicationClusters());
     }
 
     @Command(description = "Get entry filters for a topic")
@@ -1993,6 +1998,66 @@ public class CmdTopicPolicies extends CmdBase {
         }
     }
 
+    @Command(description = "Set the replication clusters for a topic, global 
policy will be copied to the remote"
+            + " cluster if you enabled namespace level replication.")
+    private class SetReplicationClusters extends CliCommand {
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Option(names = { "--clusters",
+                "-c" }, description = "Replication Cluster Ids list (comma 
separated values)", required = true)
+        private String clusterIds;
+
+        @Option(names = { "--global", "-g" }, description = "Whether to set 
this policy globally. "
+                + "If set to true, the policy will be replicate to other 
clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(topicName);
+            List<String> clusters = Lists.newArrayList(clusterIds.split(","));
+            getTopicPolicies(isGlobal).setReplicationClusters(persistentTopic, 
clusters);
+        }
+    }
+
+    @Command(description = "Get the replication clusters for a topic")
+    private class GetReplicationClusters extends CliCommand {
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Option(names = { "-ap", "--applied" }, description = "Get the applied 
policy of the topic. If set to true,"
+                + " the param \"--global\" will be ignored. ")
+        private boolean applied = false;
+
+        @Option(names = { "--global", "-g" }, description = "Whether to get 
this policy globally. "
+                + "If set \"--applied\" to true, the current param will be 
ignored. ")
+        private boolean isGlobal = false;
+
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(topicName);
+            
print(getTopicPolicies(isGlobal).getReplicationClusters(persistentTopic, 
applied));
+        }
+    }
+
+    @Command(description = "Remove the replication clusters for a topic, it 
will not remove the policy from the remote"
+            + "cluster")
+    private class RemoveReplicationClusters extends CliCommand {
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Option(names = { "--global", "-g" }, description = "Whether to get 
this policy globally. "
+                + "If set to true, the policy will be replicate to other 
clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(topicName);
+            
getTopicPolicies(isGlobal).removeReplicationClusters(persistentTopic);
+        }
+    }
+
     private TopicPolicies getTopicPolicies(boolean isGlobal) {
         return getAdmin().topicPolicies(isGlobal);
     }

Reply via email to