This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 04ddc9b Support for setting geo-replication clusters on topic level
(#12136)
04ddc9b is described below
commit 04ddc9b0324e6d09b6926bbfe5402f006ad633c3
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Nov 4 23:06:18 2021 +0800
Support for setting geo-replication clusters on topic level (#12136)
### Motivation
Currently, when we try to enable the geo-replication, we need to set
multiple clusters for the namespaces, and all the topics under the namespace
will start to replicate data across clusters.
It's useful for adding the replicate clusters in the topic level so that we
can achieve the topics under a namespace can have different replication
clusters such as topic A(cluster-a <-> cluster-b) topic B(cluster-b <->
cluster-c).
### Documentation
I'm not sure if the doc could generate automatically. If no, I will update
the doc later.
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 29 --------
.../broker/admin/impl/PersistentTopicsBase.java | 54 +++++++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 81 ++++++++++++++++++++++
.../broker/service/persistent/PersistentTopic.java | 50 +++++++------
.../pulsar/broker/web/PulsarWebResource.java | 29 ++++++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 20 ++++++
.../pulsar/broker/service/PersistentTopicTest.java | 50 +++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 58 ++++++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 77 ++++++++++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 9 +++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 48 +++++++++++++
.../pulsar/common/policies/data/TopicPolicies.java | 13 ++--
12 files changed, 463 insertions(+), 55 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index f2327e2..f89951b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -24,7 +24,6 @@ import static
org.apache.pulsar.common.policies.data.PoliciesUtil.defaultBundle;
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
@@ -1961,34 +1960,6 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
- /**
- * It validates that peer-clusters can't coexist in replication-clusters.
- *
- * @param clusterName: given cluster whose peer-clusters can't be
present into replication-cluster list
- * @param replicationClusters: replication-cluster list
- */
- private void validatePeerClusterConflict(String clusterName, Set<String>
replicationClusters) {
- try {
- ClusterData clusterData =
clusterResources().getCluster(clusterName).orElseThrow(
- () -> new RestException(Status.PRECONDITION_FAILED,
"Invalid replication cluster " + clusterName));
- Set<String> peerClusters = clusterData.getPeerClusterNames();
- if (peerClusters != null && !peerClusters.isEmpty()) {
- SetView<String> conflictPeerClusters =
Sets.intersection(peerClusters, replicationClusters);
- if (!conflictPeerClusters.isEmpty()) {
- log.warn("[{}] {}'s peer cluster can't be part of
replication clusters {}", clientAppId(),
- clusterName, conflictPeerClusters);
- throw new RestException(Status.CONFLICT,
- String.format("%s's peer-clusters %s can't be part
of replication-clusters %s", clusterName,
- conflictPeerClusters,
replicationClusters));
- }
- }
- } catch (RestException re) {
- throw re;
- } catch (Exception e) {
- log.warn("[{}] Failed to get cluster-data for {}", clientAppId(),
clusterName, e);
- }
- }
-
protected BundlesData validateBundlesData(BundlesData initialBundles) {
SortedSet<String> partitions = new TreeSet<String>();
for (String partition : initialBundles.getBoundaries()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ed1dc89..fb466bf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2816,6 +2816,60 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ protected CompletableFuture<Void>
internalSetReplicationClusters(List<String> clusterIds) {
+ validateTopicPolicyOperation(topicName, PolicyName.REPLICATION,
PolicyOperation.WRITE);
+ validatePoliciesReadOnlyAccess();
+
+ Set<String> replicationClusters = Sets.newHashSet(clusterIds);
+ if (replicationClusters.contains("global")) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Cannot specify global in the list of replication
clusters");
+ }
+ Set<String> clusters = clusters();
+ for (String clusterId : replicationClusters) {
+ if (!clusters.contains(clusterId)) {
+ throw new RestException(Status.FORBIDDEN, "Invalid cluster id:
" + clusterId);
+ }
+ validatePeerClusterConflict(clusterId, replicationClusters);
+ validateClusterForTenant(namespaceName.getTenant(), clusterId);
+ }
+
+ return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
+ TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
+
topicPolicies.setReplicationClusters(Lists.newArrayList(replicationClusters));
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies)
+ .thenRun(() -> {
+ log.info("[{}] Successfully set replication
clusters for namespace={}, "
+ + "topic={}, clusters={}",
+ clientAppId(),
+ namespaceName,
+ topicName.getLocalName(),
+
topicPolicies.getReplicationClusters());
+ });
+ }
+ );
+ }
+
+ protected CompletableFuture<Void> internalRemoveReplicationClusters() {
+ validateTopicPolicyOperation(topicName, PolicyName.REPLICATION,
PolicyOperation.WRITE);
+ validatePoliciesReadOnlyAccess();
+
+ return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
+ TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
+ topicPolicies.setReplicationClusters(null);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies)
+ .thenRun(() -> {
+ log.info("[{}] Successfully set replication
clusters for namespace={}, "
+ + "topic={}, clusters={}",
+ clientAppId(),
+ namespaceName,
+ topicName.getLocalName(),
+
topicPolicies.getReplicationClusters());
+ });
+ }
+ );
+ }
+
protected CompletableFuture<Boolean> internalGetDeduplication(boolean
applied) {
return getTopicPoliciesAsyncWithRetry(topicName)
.thenApply(op -> op.map(TopicPolicies::getDeduplicationEnabled)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 09b694c..1318057 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1678,6 +1678,87 @@ public class PersistentTopics extends
PersistentTopicsBase {
}
@GET
+ @Path("/{tenant}/{namespace}/{topic}/replication")
+ @ApiOperation(value = "Get the replication clusters for a topic")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message =
+ "Topic level policy is disabled, enable the topic level
policy and retry")})
+ public void getReplicationClusters(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("applied") boolean applied,
+ @ApiParam(value = "Is authentication required to
perform this operation")
+ @QueryParam("authoritative")
@DefaultValue("false") boolean authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ preValidation(authoritative)
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenAccept(op -> {
+
asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(()
-> {
+ if (applied) {
+ return
getNamespacePolicies(namespaceName).replication_clusters;
+ }
+ return null;
+ }));
+ })
+ .exceptionally(ex -> {
+ handleTopicPolicyException("getReplicationClusters", ex,
asyncResponse);
+ return null;
+ });
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/replication")
+ @ApiOperation(value = "Set the replication clusters for a topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 405,
+ message = "Topic level policy is disabled, to enable the
topic level policy and retry"),
+ @ApiResponse(code = 412, message = "Topic is not global or invalid
cluster ids")})
+ public void setReplicationClusters(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant, @PathParam("namespace") String
namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Is authentication required to perform this
operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
+ @ApiParam(value = "List of replication clusters", required = true)
List<String> clusterIds) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ preValidation(authoritative)
+ .thenCompose(__ -> internalSetReplicationClusters(clusterIds))
+ .thenRun(() ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ handleTopicPolicyException("setReplicationClusters", ex,
asyncResponse);
+ return null;
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/replication")
+ @ApiOperation(value = "Remove the replication clusters from a topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405,
+ message = "Topic level policy is disabled, to enable the
topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeReplicationClusters(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant") String tenant, @PathParam("namespace") String
namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
+ @ApiParam(value = "Is authentication required to perform this
operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ preValidation(authoritative)
+ .thenCompose(__ -> internalRemoveReplicationClusters())
+ .thenRun(() ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ handleTopicPolicyException("removeReplicationClusters",
ex, asyncResponse);
+ return null;
+ });
+ }
+
+ @GET
@Path("/{tenant}/{namespace}/{topic}/messageTTL")
@ApiOperation(value = "Get message TTL in seconds for a topic")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b1ee3e3..71b9471 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1341,33 +1341,15 @@ public class PersistentTopic extends AbstractTopic
log.debug("[{}] Checking replication status", name);
}
- CompletableFuture<Policies> policiesFuture =
brokerService.pulsar().getPulsarResources()
- .getNamespaceResources()
- .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
- .thenCompose(optPolicies -> {
- if (!optPolicies.isPresent()) {
- return FutureUtil.failedFuture(
- new ServerMetadataException(
- new
MetadataStoreException.NotFoundException()));
- }
-
- return
CompletableFuture.completedFuture(optPolicies.get());
- });
+ CompletableFuture<List<String>> replicationClustersFuture =
getReplicationClusters(name);
CompletableFuture<Integer> ttlFuture = getMessageTTL();
- return CompletableFuture.allOf(policiesFuture, ttlFuture)
+ return CompletableFuture.allOf(replicationClustersFuture, ttlFuture)
.thenCompose(__ -> {
- Policies policies = policiesFuture.join();
+ List<String> configuredClusters =
replicationClustersFuture.join();
int newMessageTTLinSeconds = ttlFuture.join();
- Set<String> configuredClusters;
- if (policies.replication_clusters != null) {
- configuredClusters =
Sets.newTreeSet(policies.replication_clusters);
- } else {
- configuredClusters = Collections.emptySet();
- }
-
String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
// if local cluster is removed from global namespace
cluster-list : then delete topic forcefully
@@ -1408,6 +1390,32 @@ public class PersistentTopic extends AbstractTopic
});
}
+ @VisibleForTesting
+ public CompletableFuture<List<String>> getReplicationClusters(TopicName
topicName) {
+ return brokerService.pulsar()
+ .getTopicPoliciesService()
+ .getTopicPoliciesAsyncWithRetry(topicName, null,
brokerService.pulsar().getExecutor())
+ .thenCompose(topicPolicies -> {
+ if (!topicPolicies.isPresent() ||
topicPolicies.get().getReplicationClusters() == null) {
+ return brokerService.pulsar().getPulsarResources()
+ .getNamespaceResources()
+
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
+ .thenCompose(optPolicies -> {
+ if (!optPolicies.isPresent()) {
+ return FutureUtil.failedFuture(
+ new ServerMetadataException(
+ new
MetadataStoreException.NotFoundException()));
+ }
+
+ return CompletableFuture.completedFuture(
+
Lists.newArrayList(optPolicies.get().replication_clusters));
+ });
+ } else {
+ return
CompletableFuture.completedFuture(topicPolicies.get().getReplicationClusters());
+ }
+ });
+ }
+
@Override
public void checkMessageExpiry() {
getMessageTTL().thenAccept(messageTtlInSeconds -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index d4955de..e3ef2b3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
@@ -306,6 +307,34 @@ public abstract class PulsarWebResource {
}
}
+ /**
+ * It validates that peer-clusters can't coexist in replication-clusters.
+ *
+ * @clusterName: given cluster whose peer-clusters can't be present into
replication-cluster list
+ * @replicationClusters: replication-cluster list
+ */
+ protected void validatePeerClusterConflict(String clusterName, Set<String>
replicationClusters) {
+ try {
+ ClusterData clusterData =
clusterResources().getCluster(clusterName).orElseThrow(
+ () -> new RestException(Status.PRECONDITION_FAILED,
"Invalid replication cluster " + clusterName));
+ Set<String> peerClusters = clusterData.getPeerClusterNames();
+ if (peerClusters != null && !peerClusters.isEmpty()) {
+ Sets.SetView<String> conflictPeerClusters =
Sets.intersection(peerClusters, replicationClusters);
+ if (!conflictPeerClusters.isEmpty()) {
+ log.warn("[{}] {}'s peer cluster can't be part of
replication clusters {}", clientAppId(),
+ clusterName, conflictPeerClusters);
+ throw new RestException(Status.CONFLICT,
+ String.format("%s's peer-clusters %s can't be part
of replication-clusters %s", clusterName,
+ conflictPeerClusters,
replicationClusters));
+ }
+ }
+ } catch (RestException re) {
+ throw re;
+ } catch (Exception e) {
+ log.warn("[{}] Failed to get cluster-data for {}", clientAppId(),
clusterName, e);
+ }
+ }
+
protected void validateClusterForTenant(String tenant, String cluster) {
TenantInfo tenantInfo;
try {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 0383333..535c4a4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.api.client.util.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
@@ -2643,6 +2644,25 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
});
}
+ @Test(timeOut = 30000)
+ public void testReplicatorClusterApi() throws Exception {
+ final String topic = "persistent://" + myNamespace + "/test-" +
UUID.randomUUID();
+ // init cache
+ pulsarClient.newProducer().topic(topic).create().close();
+
+ assertNull(admin.topics().getReplicationClusters(topic, false));
+
+ List<String> clusters = Lists.newArrayList();
+ clusters.add("test");
+ admin.topics().setReplicationClusters(topic, clusters);
+ Awaitility.await().untilAsserted(()
+ -> assertEquals(admin.topics().getReplicationClusters(topic,
false), clusters));
+
+ admin.topics().removeReplicationClusters(topic);
+ Awaitility.await().untilAsserted(()
+ -> assertNull(admin.topics().getReplicationClusters(topic,
false)));
+ }
+
@Test
public void testLoopCreateAndDeleteTopicPolicies() throws Exception {
final String topic = testTopic + UUID.randomUUID();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 67faebe..71fbac8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -47,9 +47,11 @@ import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -118,6 +120,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -2252,4 +2255,51 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
messageData.writeTo(headers);
return ByteBufPair.coalesce(ByteBufPair.get(headers, payload));
}
+
+ @Test
+ public void testGetReplicationClusters() throws Exception {
+ PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ TopicName name = TopicName.get(successTopicName);
+ CompletableFuture<List<String>> replicationClusters =
topic.getReplicationClusters(name);
+ try {
+ replicationClusters.get();
+ fail("Should have failed");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof
BrokerServiceException.ServerMetadataException);
+ }
+
+ PulsarResources pulsarResources = spy(new PulsarResources(store,
store));
+ NamespaceResources nsr = spy(new NamespaceResources(store, store, 30));
+ doReturn(nsr).when(pulsarResources).getNamespaceResources();
+ doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ CompletableFuture<Optional<Policies>> policiesFuture = new
CompletableFuture<>();
+ Policies policies = new Policies();
+ Set<String> namespaceClusters = new HashSet<>();
+ namespaceClusters.add("namespace-cluster");
+ policies.replication_clusters = namespaceClusters;
+ Optional<Policies> optionalPolicies = Optional.of(policies);
+ policiesFuture.complete(optionalPolicies);
+ doReturn(policiesFuture).when(nsr).getPoliciesAsync(any());
+
+ topic = new PersistentTopic(successTopicName, ledgerMock,
brokerService);
+ replicationClusters = topic.getReplicationClusters(name);
+
+ assertEquals(replicationClusters.get(), namespaceClusters);
+
+ TopicPoliciesService topicPoliciesService =
mock(TopicPoliciesService.class);
+ doReturn(topicPoliciesService).when(pulsar).getTopicPoliciesService();
+ CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture = new
CompletableFuture<>();
+ TopicPolicies topicPolicies = new TopicPolicies();
+ List<String> topicClusters = new ArrayList<>();
+ topicClusters.add("topic-cluster");
+ topicPolicies.setReplicationClusters(topicClusters);
+ Optional<TopicPolicies> optionalTopicPolicies =
Optional.of(topicPolicies);
+ topicPoliciesFuture.complete(optionalTopicPolicies);
+ when(topicPoliciesService.getTopicPoliciesAsyncWithRetry(any(), any(),
any())).thenReturn(topicPoliciesFuture);
+
+ topic = new PersistentTopic(successTopicName, ledgerMock,
brokerService);
+ replicationClusters = topic.getReplicationClusters(name);
+
+ assertEquals(replicationClusters.get(), topicClusters);
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index daab409..03bca5c 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -3580,4 +3580,62 @@ public interface Topics {
* @return
*/
CompletableFuture<Void> setReplicatedSubscriptionStatusAsync(String topic,
String subName, boolean enabled);
+
+ /**
+ * Get the replication clusters for a topic.
+ *
+ * @param topic
+ * @param applied
+ * @return
+ * @throws PulsarAdminException
+ */
+ Set<String> getReplicationClusters(String topic, boolean applied) throws
PulsarAdminException;
+
+ /**
+ * Get the replication clusters for a topic asynchronously.
+ *
+ * @param topic
+ * @param applied
+ * @return
+ * @throws PulsarAdminException
+ */
+ CompletableFuture<Set<String>> getReplicationClustersAsync(String topic,
boolean applied);
+
+ /**
+ * Set the replication clusters for the topic.
+ *
+ * @param topic
+ * @param clusterIds
+ * @return
+ * @throws PulsarAdminException
+ */
+ void setReplicationClusters(String topic, List<String> clusterIds) throws
PulsarAdminException;
+
+ /**
+ * Set the replication clusters for the topic asynchronously.
+ *
+ * @param topic
+ * @param clusterIds
+ * @return
+ * @throws PulsarAdminException
+ */
+ CompletableFuture<Void> setReplicationClustersAsync(String topic,
List<String> clusterIds);
+
+ /**
+ * Remove the replication clusters for the topic.
+ *
+ * @param topic
+ * @return
+ * @throws PulsarAdminException
+ */
+ void removeReplicationClusters(String topic) throws PulsarAdminException;
+
+ /**
+ * Remove the replication clusters for the topic asynchronously.
+ *
+ * @param topic
+ * @return
+ * @throws PulsarAdminException
+ */
+ CompletableFuture<Void> removeReplicationClustersAsync(String topic);
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index caf32e4..7cd153f 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -3832,5 +3832,82 @@ public class TopicsImpl extends BaseResource implements
Topics {
return asyncPostRequest(path, Entity.entity(enabled,
MediaType.APPLICATION_JSON));
}
+ @Override
+ public Set<String> getReplicationClusters(String topic, boolean applied)
throws PulsarAdminException {
+ try {
+ return getReplicationClustersAsync(topic,
applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> getReplicationClustersAsync(String
topic, boolean applied) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "replication");
+ path = path.queryParam("applied", applied);
+ final CompletableFuture<Set<String>> future = new
CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Set<String>>() {
+ @Override
+ public void completed(Set<String> clusterIds) {
+ future.complete(clusterIds);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setReplicationClusters(String topic, List<String> clusterIds)
throws PulsarAdminException {
+ try {
+ setReplicationClustersAsync(topic,
clusterIds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setReplicationClustersAsync(String topic,
List<String> clusterIds) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "replication");
+ return asyncPostRequest(path, Entity.entity(clusterIds,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void removeReplicationClusters(String topic) throws
PulsarAdminException {
+ try {
+ removeReplicationClustersAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeReplicationClustersAsync(String
topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "replication");
+ return asyncDeleteRequest(path);
+ }
+
private static final Logger log =
LoggerFactory.getLogger(TopicsImpl.class);
}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index b311417..ba6153a 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1295,6 +1295,15 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("get-max-consumers
persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getMaxConsumers("persistent://myprop/clust/ns1/ds1", true);
+
+ cmdTopics.run(split("get-replication-clusters
persistent://myprop/clust/ns1/ds1 --applied"));
+
verify(mockTopics).getReplicationClusters("persistent://myprop/clust/ns1/ds1",
true);
+
+ cmdTopics.run(split("set-replication-clusters
persistent://myprop/clust/ns1/ds1 -c test"));
+
verify(mockTopics).setReplicationClusters("persistent://myprop/clust/ns1/ds1",
Lists.newArrayList("test"));
+
+ cmdTopics.run(split("remove-replication-clusters
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopics).removeReplicationClusters("persistent://myprop/clust/ns1/ds1");
}
private static LedgerInfo newLedger(long id, long entries, long size) {
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9fa3781..7ba4f73 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -227,6 +227,10 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("set-replicated-subscription-status", new
SetReplicatedSubscriptionStatus());
jcommander.addCommand("get-backlog-size", new
GetBacklogSizeByMessageId());
+ jcommander.addCommand("get-replication-clusters", new
GetReplicationClusters());
+ jcommander.addCommand("set-replication-clusters", new
SetReplicationClusters());
+ jcommander.addCommand("remove-replication-clusters", new
RemoveReplicationClusters());
+
initDeprecatedCommands();
}
@@ -1252,6 +1256,50 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the replication clusters for a
topic")
+ private class GetReplicationClusters extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-ap", "--applied" }, description = "Get the
applied policy of the topic")
+ private boolean applied = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(getTopics().getReplicationClusters(persistentTopic,
applied));
+ }
+ }
+
+ @Parameters(commandDescription = "Set the replication clusters for a
topic")
+ private class SetReplicationClusters extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--clusters",
+ "-c" }, description = "Replication Cluster Ids list (comma
separated values)", required = true)
+ private String clusterIds;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ List<String> clusters = Lists.newArrayList(clusterIds.split(","));
+ getTopics().setReplicationClusters(persistentTopic, clusters);
+ }
+ }
+
+ @Parameters(commandDescription = "Remove the replication clusters for a
topic")
+ private class RemoveReplicationClusters extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopics().removeReplicationClusters(persistentTopic);
+ }
+ }
+
@Parameters(commandDescription = "Get the delayed delivery policy for a
topic")
private class GetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace/topic", required = true)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 1dff1c4..8b5d391 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -18,16 +18,17 @@
*/
package org.apache.pulsar.common.policies.data;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
-import lombok.Getter;
import lombok.NoArgsConstructor;
-import lombok.Setter;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
@@ -40,15 +41,13 @@ import
org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
@Builder
@NoArgsConstructor
@AllArgsConstructor
-@Getter
-@Setter
public class TopicPolicies {
@Builder.Default
private Map<String, BacklogQuotaImpl> backLogQuotaMap = new HashMap<>();
@Builder.Default
private List<SubType> subscriptionTypesEnabled = new ArrayList<>();
-
+ private List<String> replicationClusters;
private PersistencePolicies persistence;
private RetentionPolicies retentionPolicies;
private Boolean deduplicationEnabled;
@@ -163,4 +162,8 @@ public class TopicPolicies {
public boolean isSubscribeRateSet() {
return subscribeRate != null;
}
+
+ public Set<String> getReplicationClustersSet() {
+ return replicationClusters != null ?
Sets.newTreeSet(this.replicationClusters) : null;
+ }
}