This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 04ddc9b  Support for setting geo-replication clusters on topic level 
(#12136)
04ddc9b is described below

commit 04ddc9b0324e6d09b6926bbfe5402f006ad633c3
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Nov 4 23:06:18 2021 +0800

    Support for setting geo-replication clusters on topic level (#12136)
    
    ### Motivation
    Currently, when we try to enable the geo-replication, we need to set 
multiple clusters for the namespaces, and all the topics under the namespace 
will start to replicate data across clusters.
    
    It's useful for adding the replicate clusters in the topic level so that we 
can achieve the topics under a namespace can have different replication 
clusters such as topic A(cluster-a <-> cluster-b) topic B(cluster-b <-> 
cluster-c).
    
    ### Documentation
    I'm not sure if the doc could generate automatically. If no, I will update 
the doc later.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 29 --------
 .../broker/admin/impl/PersistentTopicsBase.java    | 54 +++++++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 81 ++++++++++++++++++++++
 .../broker/service/persistent/PersistentTopic.java | 50 +++++++------
 .../pulsar/broker/web/PulsarWebResource.java       | 29 ++++++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 20 ++++++
 .../pulsar/broker/service/PersistentTopicTest.java | 50 +++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 58 ++++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 77 ++++++++++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  9 +++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 48 +++++++++++++
 .../pulsar/common/policies/data/TopicPolicies.java | 13 ++--
 12 files changed, 463 insertions(+), 55 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index f2327e2..f89951b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -24,7 +24,6 @@ import static 
org.apache.pulsar.common.policies.data.PoliciesUtil.defaultBundle;
 import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
@@ -1961,34 +1960,6 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
-    /**
-     * It validates that peer-clusters can't coexist in replication-clusters.
-     *
-     * @param clusterName:         given cluster whose peer-clusters can't be 
present into replication-cluster list
-     * @param replicationClusters: replication-cluster list
-     */
-    private void validatePeerClusterConflict(String clusterName, Set<String> 
replicationClusters) {
-        try {
-            ClusterData clusterData = 
clusterResources().getCluster(clusterName).orElseThrow(
-                    () -> new RestException(Status.PRECONDITION_FAILED, 
"Invalid replication cluster " + clusterName));
-            Set<String> peerClusters = clusterData.getPeerClusterNames();
-            if (peerClusters != null && !peerClusters.isEmpty()) {
-                SetView<String> conflictPeerClusters = 
Sets.intersection(peerClusters, replicationClusters);
-                if (!conflictPeerClusters.isEmpty()) {
-                    log.warn("[{}] {}'s peer cluster can't be part of 
replication clusters {}", clientAppId(),
-                            clusterName, conflictPeerClusters);
-                    throw new RestException(Status.CONFLICT,
-                            String.format("%s's peer-clusters %s can't be part 
of replication-clusters %s", clusterName,
-                                    conflictPeerClusters, 
replicationClusters));
-                }
-            }
-        } catch (RestException re) {
-            throw re;
-        } catch (Exception e) {
-            log.warn("[{}] Failed to get cluster-data for {}", clientAppId(), 
clusterName, e);
-        }
-    }
-
     protected BundlesData validateBundlesData(BundlesData initialBundles) {
         SortedSet<String> partitions = new TreeSet<String>();
         for (String partition : initialBundles.getBoundaries()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ed1dc89..fb466bf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2816,6 +2816,60 @@ public class PersistentTopicsBase extends AdminResource {
             });
     }
 
+    protected CompletableFuture<Void> 
internalSetReplicationClusters(List<String> clusterIds) {
+        validateTopicPolicyOperation(topicName, PolicyName.REPLICATION, 
PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+
+        Set<String> replicationClusters = Sets.newHashSet(clusterIds);
+        if (replicationClusters.contains("global")) {
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "Cannot specify global in the list of replication 
clusters");
+        }
+        Set<String> clusters = clusters();
+        for (String clusterId : replicationClusters) {
+            if (!clusters.contains(clusterId)) {
+                throw new RestException(Status.FORBIDDEN, "Invalid cluster id: 
" + clusterId);
+            }
+            validatePeerClusterConflict(clusterId, replicationClusters);
+            validateClusterForTenant(namespaceName.getTenant(), clusterId);
+        }
+
+        return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
+                    TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
+                    
topicPolicies.setReplicationClusters(Lists.newArrayList(replicationClusters));
+                    return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies)
+                            .thenRun(() -> {
+                                log.info("[{}] Successfully set replication 
clusters for namespace={}, "
+                                                + "topic={}, clusters={}",
+                                        clientAppId(),
+                                        namespaceName,
+                                        topicName.getLocalName(),
+                                        
topicPolicies.getReplicationClusters());
+                            });
+                }
+        );
+    }
+
+    protected CompletableFuture<Void> internalRemoveReplicationClusters() {
+        validateTopicPolicyOperation(topicName, PolicyName.REPLICATION, 
PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+
+        return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
+                    TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setReplicationClusters(null);
+                    return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies)
+                            .thenRun(() -> {
+                                log.info("[{}] Successfully set replication 
clusters for namespace={}, "
+                                                + "topic={}, clusters={}",
+                                        clientAppId(),
+                                        namespaceName,
+                                        topicName.getLocalName(),
+                                        
topicPolicies.getReplicationClusters());
+                            });
+                }
+        );
+    }
+
     protected CompletableFuture<Boolean> internalGetDeduplication(boolean 
applied) {
         return getTopicPoliciesAsyncWithRetry(topicName)
             .thenApply(op -> op.map(TopicPolicies::getDeduplicationEnabled)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 09b694c..1318057 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1678,6 +1678,87 @@ public class PersistentTopics extends 
PersistentTopicsBase {
     }
 
     @GET
+    @Path("/{tenant}/{namespace}/{topic}/replication")
+    @ApiOperation(value = "Get the replication clusters for a topic")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message =
+                    "Topic level policy is disabled, enable the topic level 
policy and retry")})
+    public void getReplicationClusters(@Suspended final AsyncResponse 
asyncResponse,
+                              @PathParam("tenant") String tenant,
+                              @PathParam("namespace") String namespace,
+                              @PathParam("topic") @Encoded String encodedTopic,
+                              @QueryParam("applied") boolean applied,
+                              @ApiParam(value = "Is authentication required to 
perform this operation")
+                              @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+                .thenAccept(op -> {
+                    
asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(()
 -> {
+                        if (applied) {
+                            return 
getNamespacePolicies(namespaceName).replication_clusters;
+                        }
+                        return null;
+                    }));
+                })
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("getReplicationClusters", ex, 
asyncResponse);
+                    return null;
+                });
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/replication")
+    @ApiOperation(value = "Set the replication clusters for a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the 
topic level policy and retry"),
+            @ApiResponse(code = 412, message = "Topic is not global or invalid 
cluster ids")})
+    public void setReplicationClusters(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant, @PathParam("namespace") String 
namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this 
operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @ApiParam(value = "List of replication clusters", required = true) 
List<String> clusterIds) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> internalSetReplicationClusters(clusterIds))
+                .thenRun(() -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("setReplicationClusters", ex, 
asyncResponse);
+                    return null;
+                });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/replication")
+    @ApiOperation(value = "Remove the replication clusters from a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the 
topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeReplicationClusters(@Suspended final AsyncResponse 
asyncResponse,
+            @PathParam("tenant") String tenant, @PathParam("namespace") String 
namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
+            @ApiParam(value = "Is authentication required to perform this 
operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> internalRemoveReplicationClusters())
+                .thenRun(() -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("removeReplicationClusters", 
ex, asyncResponse);
+                    return null;
+                });
+    }
+
+    @GET
     @Path("/{tenant}/{namespace}/{topic}/messageTTL")
     @ApiOperation(value = "Get message TTL in seconds for a topic")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b1ee3e3..71b9471 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1341,33 +1341,15 @@ public class PersistentTopic extends AbstractTopic
             log.debug("[{}] Checking replication status", name);
         }
 
-        CompletableFuture<Policies> policiesFuture = 
brokerService.pulsar().getPulsarResources()
-                .getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                            if (!optPolicies.isPresent()) {
-                                return FutureUtil.failedFuture(
-                                        new ServerMetadataException(
-                                                new 
MetadataStoreException.NotFoundException()));
-                            }
-
-                            return 
CompletableFuture.completedFuture(optPolicies.get());
-                        });
+        CompletableFuture<List<String>> replicationClustersFuture = 
getReplicationClusters(name);
 
         CompletableFuture<Integer> ttlFuture = getMessageTTL();
 
-        return CompletableFuture.allOf(policiesFuture, ttlFuture)
+        return CompletableFuture.allOf(replicationClustersFuture, ttlFuture)
                 .thenCompose(__ -> {
-                    Policies policies = policiesFuture.join();
+                    List<String> configuredClusters = 
replicationClustersFuture.join();
                     int newMessageTTLinSeconds = ttlFuture.join();
 
-                    Set<String> configuredClusters;
-                    if (policies.replication_clusters != null) {
-                        configuredClusters = 
Sets.newTreeSet(policies.replication_clusters);
-                    } else {
-                        configuredClusters = Collections.emptySet();
-                    }
-
                     String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
 
                     // if local cluster is removed from global namespace 
cluster-list : then delete topic forcefully
@@ -1408,6 +1390,32 @@ public class PersistentTopic extends AbstractTopic
                 });
     }
 
+    @VisibleForTesting
+    public CompletableFuture<List<String>> getReplicationClusters(TopicName 
topicName) {
+        return brokerService.pulsar()
+                .getTopicPoliciesService()
+                .getTopicPoliciesAsyncWithRetry(topicName, null, 
brokerService.pulsar().getExecutor())
+                .thenCompose(topicPolicies -> {
+                    if (!topicPolicies.isPresent() || 
topicPolicies.get().getReplicationClusters() == null) {
+                        return brokerService.pulsar().getPulsarResources()
+                                .getNamespaceResources()
+                                
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
+                                .thenCompose(optPolicies -> {
+                                    if (!optPolicies.isPresent()) {
+                                        return FutureUtil.failedFuture(
+                                                new ServerMetadataException(
+                                                        new 
MetadataStoreException.NotFoundException()));
+                                    }
+
+                                    return CompletableFuture.completedFuture(
+                                            
Lists.newArrayList(optPolicies.get().replication_clusters));
+                                });
+                    } else {
+                        return 
CompletableFuture.completedFuture(topicPolicies.get().getReplicationClusters());
+                    }
+                });
+    }
+
     @Override
     public void checkMessageExpiry() {
         getMessageTTL().thenAccept(messageTtlInSeconds -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index d4955de..e3ef2b3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
@@ -306,6 +307,34 @@ public abstract class PulsarWebResource {
         }
     }
 
+    /**
+     * It validates that peer-clusters can't coexist in replication-clusters.
+     *
+     * @clusterName: given cluster whose peer-clusters can't be present into 
replication-cluster list
+     * @replicationClusters: replication-cluster list
+     */
+    protected void validatePeerClusterConflict(String clusterName, Set<String> 
replicationClusters) {
+        try {
+            ClusterData clusterData = 
clusterResources().getCluster(clusterName).orElseThrow(
+                    () -> new RestException(Status.PRECONDITION_FAILED, 
"Invalid replication cluster " + clusterName));
+            Set<String> peerClusters = clusterData.getPeerClusterNames();
+            if (peerClusters != null && !peerClusters.isEmpty()) {
+                Sets.SetView<String> conflictPeerClusters = 
Sets.intersection(peerClusters, replicationClusters);
+                if (!conflictPeerClusters.isEmpty()) {
+                    log.warn("[{}] {}'s peer cluster can't be part of 
replication clusters {}", clientAppId(),
+                            clusterName, conflictPeerClusters);
+                    throw new RestException(Status.CONFLICT,
+                            String.format("%s's peer-clusters %s can't be part 
of replication-clusters %s", clusterName,
+                                    conflictPeerClusters, 
replicationClusters));
+                }
+            }
+        } catch (RestException re) {
+            throw re;
+        } catch (Exception e) {
+            log.warn("[{}] Failed to get cluster-data for {}", clientAppId(), 
clusterName, e);
+        }
+    }
+
     protected void validateClusterForTenant(String tenant, String cluster) {
         TenantInfo tenantInfo;
         try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 0383333..535c4a4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.google.api.client.util.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
@@ -2643,6 +2644,25 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         });
     }
 
+    @Test(timeOut = 30000)
+    public void testReplicatorClusterApi() throws Exception {
+        final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
+        // init cache
+        pulsarClient.newProducer().topic(topic).create().close();
+
+        assertNull(admin.topics().getReplicationClusters(topic, false));
+
+        List<String> clusters = Lists.newArrayList();
+        clusters.add("test");
+        admin.topics().setReplicationClusters(topic, clusters);
+        Awaitility.await().untilAsserted(()
+                -> assertEquals(admin.topics().getReplicationClusters(topic, 
false), clusters));
+
+        admin.topics().removeReplicationClusters(topic);
+        Awaitility.await().untilAsserted(()
+                -> assertNull(admin.topics().getReplicationClusters(topic, 
false)));
+    }
+
     @Test
     public void testLoopCreateAndDeleteTopicPolicies() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 67faebe..71fbac8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -47,9 +47,11 @@ import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -118,6 +120,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -2252,4 +2255,51 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         messageData.writeTo(headers);
         return ByteBufPair.coalesce(ByteBufPair.get(headers, payload));
     }
+
+    @Test
+    public void testGetReplicationClusters() throws Exception {
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        TopicName name = TopicName.get(successTopicName);
+        CompletableFuture<List<String>> replicationClusters = 
topic.getReplicationClusters(name);
+        try {
+            replicationClusters.get();
+            fail("Should have failed");
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof 
BrokerServiceException.ServerMetadataException);
+        }
+
+        PulsarResources pulsarResources = spy(new PulsarResources(store, 
store));
+        NamespaceResources nsr = spy(new NamespaceResources(store, store, 30));
+        doReturn(nsr).when(pulsarResources).getNamespaceResources();
+        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        CompletableFuture<Optional<Policies>> policiesFuture = new 
CompletableFuture<>();
+        Policies policies = new Policies();
+        Set<String> namespaceClusters = new HashSet<>();
+        namespaceClusters.add("namespace-cluster");
+        policies.replication_clusters = namespaceClusters;
+        Optional<Policies> optionalPolicies = Optional.of(policies);
+        policiesFuture.complete(optionalPolicies);
+        doReturn(policiesFuture).when(nsr).getPoliciesAsync(any());
+
+        topic = new PersistentTopic(successTopicName, ledgerMock, 
brokerService);
+        replicationClusters = topic.getReplicationClusters(name);
+
+        assertEquals(replicationClusters.get(), namespaceClusters);
+
+        TopicPoliciesService topicPoliciesService = 
mock(TopicPoliciesService.class);
+        doReturn(topicPoliciesService).when(pulsar).getTopicPoliciesService();
+        CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture = new 
CompletableFuture<>();
+        TopicPolicies topicPolicies = new TopicPolicies();
+        List<String> topicClusters = new ArrayList<>();
+        topicClusters.add("topic-cluster");
+        topicPolicies.setReplicationClusters(topicClusters);
+        Optional<TopicPolicies> optionalTopicPolicies = 
Optional.of(topicPolicies);
+        topicPoliciesFuture.complete(optionalTopicPolicies);
+        when(topicPoliciesService.getTopicPoliciesAsyncWithRetry(any(), any(), 
any())).thenReturn(topicPoliciesFuture);
+
+        topic = new PersistentTopic(successTopicName, ledgerMock, 
brokerService);
+        replicationClusters = topic.getReplicationClusters(name);
+
+        assertEquals(replicationClusters.get(), topicClusters);
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index daab409..03bca5c 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -3580,4 +3580,62 @@ public interface Topics {
      * @return
      */
     CompletableFuture<Void> setReplicatedSubscriptionStatusAsync(String topic, 
String subName, boolean enabled);
+
+    /**
+     * Get the replication clusters for a topic.
+     *
+     * @param topic
+     * @param applied
+     * @return
+     * @throws PulsarAdminException
+     */
+    Set<String> getReplicationClusters(String topic, boolean applied) throws 
PulsarAdminException;
+
+    /**
+     * Get the replication clusters for a topic asynchronously.
+     *
+     * @param topic
+     * @param applied
+     * @return
+     * @throws PulsarAdminException
+     */
+    CompletableFuture<Set<String>> getReplicationClustersAsync(String topic, 
boolean applied);
+
+    /**
+     * Set the replication clusters for the topic.
+     *
+     * @param topic
+     * @param clusterIds
+     * @return
+     * @throws PulsarAdminException
+     */
+    void setReplicationClusters(String topic, List<String> clusterIds) throws 
PulsarAdminException;
+
+    /**
+     * Set the replication clusters for the topic asynchronously.
+     *
+     * @param topic
+     * @param clusterIds
+     * @return
+     * @throws PulsarAdminException
+     */
+    CompletableFuture<Void> setReplicationClustersAsync(String topic, 
List<String> clusterIds);
+
+    /**
+     * Remove the replication clusters for the topic.
+     *
+     * @param topic
+     * @return
+     * @throws PulsarAdminException
+     */
+    void removeReplicationClusters(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove the replication clusters for the topic asynchronously.
+     *
+     * @param topic
+     * @return
+     * @throws PulsarAdminException
+     */
+    CompletableFuture<Void> removeReplicationClustersAsync(String topic);
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index caf32e4..7cd153f 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -3832,5 +3832,82 @@ public class TopicsImpl extends BaseResource implements 
Topics {
         return asyncPostRequest(path, Entity.entity(enabled, 
MediaType.APPLICATION_JSON));
     }
 
+    @Override
+    public Set<String> getReplicationClusters(String topic, boolean applied) 
throws PulsarAdminException {
+        try {
+            return getReplicationClustersAsync(topic, 
applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> getReplicationClustersAsync(String 
topic, boolean applied) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "replication");
+        path = path.queryParam("applied", applied);
+        final CompletableFuture<Set<String>> future = new 
CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Set<String>>() {
+                    @Override
+                    public void completed(Set<String> clusterIds) {
+                        future.complete(clusterIds);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        
future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setReplicationClusters(String topic, List<String> clusterIds) 
throws PulsarAdminException {
+        try {
+            setReplicationClustersAsync(topic, 
clusterIds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setReplicationClustersAsync(String topic, 
List<String> clusterIds) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "replication");
+        return asyncPostRequest(path, Entity.entity(clusterIds, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeReplicationClusters(String topic) throws 
PulsarAdminException {
+        try {
+            removeReplicationClustersAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeReplicationClustersAsync(String 
topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "replication");
+        return asyncDeleteRequest(path);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(TopicsImpl.class);
 }
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index b311417..ba6153a 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1295,6 +1295,15 @@ public class PulsarAdminToolTest {
 
         cmdTopics.run(split("get-max-consumers 
persistent://myprop/clust/ns1/ds1 -ap"));
         
verify(mockTopics).getMaxConsumers("persistent://myprop/clust/ns1/ds1", true);
+
+        cmdTopics.run(split("get-replication-clusters 
persistent://myprop/clust/ns1/ds1 --applied"));
+        
verify(mockTopics).getReplicationClusters("persistent://myprop/clust/ns1/ds1", 
true);
+
+        cmdTopics.run(split("set-replication-clusters 
persistent://myprop/clust/ns1/ds1 -c test"));
+        
verify(mockTopics).setReplicationClusters("persistent://myprop/clust/ns1/ds1", 
Lists.newArrayList("test"));
+
+        cmdTopics.run(split("remove-replication-clusters 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopics).removeReplicationClusters("persistent://myprop/clust/ns1/ds1");
     }
 
     private static LedgerInfo newLedger(long id, long entries, long size) {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9fa3781..7ba4f73 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -227,6 +227,10 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("set-replicated-subscription-status", new 
SetReplicatedSubscriptionStatus());
         jcommander.addCommand("get-backlog-size", new 
GetBacklogSizeByMessageId());
 
+        jcommander.addCommand("get-replication-clusters", new 
GetReplicationClusters());
+        jcommander.addCommand("set-replication-clusters", new 
SetReplicationClusters());
+        jcommander.addCommand("remove-replication-clusters", new 
RemoveReplicationClusters());
+
         initDeprecatedCommands();
     }
 
@@ -1252,6 +1256,50 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get the replication clusters for a 
topic")
+    private class GetReplicationClusters extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the 
applied policy of the topic")
+        private boolean applied = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopics().getReplicationClusters(persistentTopic, 
applied));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the replication clusters for a 
topic")
+    private class SetReplicationClusters extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--clusters",
+                "-c" }, description = "Replication Cluster Ids list (comma 
separated values)", required = true)
+        private String clusterIds;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            List<String> clusters = Lists.newArrayList(clusterIds.split(","));
+            getTopics().setReplicationClusters(persistentTopic, clusters);
+        }
+    }
+
+    @Parameters(commandDescription = "Remove the replication clusters for a 
topic")
+    private class RemoveReplicationClusters extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopics().removeReplicationClusters(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get the delayed delivery policy for a 
topic")
     private class GetDelayedDelivery extends CliCommand {
         @Parameter(description = "tenant/namespace/topic", required = true)
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 1dff1c4..8b5d391 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -18,16 +18,17 @@
  */
 package org.apache.pulsar.common.policies.data;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
-import lombok.Getter;
 import lombok.NoArgsConstructor;
-import lombok.Setter;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
@@ -40,15 +41,13 @@ import 
org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@Getter
-@Setter
 public class TopicPolicies {
 
     @Builder.Default
     private Map<String, BacklogQuotaImpl> backLogQuotaMap = new HashMap<>();
     @Builder.Default
     private List<SubType> subscriptionTypesEnabled = new ArrayList<>();
-
+    private List<String> replicationClusters;
     private PersistencePolicies persistence;
     private RetentionPolicies retentionPolicies;
     private Boolean deduplicationEnabled;
@@ -163,4 +162,8 @@ public class TopicPolicies {
     public boolean isSubscribeRateSet() {
         return subscribeRate != null;
     }
+
+    public Set<String> getReplicationClustersSet() {
+        return replicationClusters != null ? 
Sets.newTreeSet(this.replicationClusters) : null;
+    }
 }

Reply via email to