This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1e57827b333 [improve][admin] PIP-422 part 1: Support global
topic-level replicated clusters policy (#24390)
1e57827b333 is described below
commit 1e57827b33395a2124593d95fa7641ee5e125d9b
Author: fengyubiao <[email protected]>
AuthorDate: Mon Aug 4 10:16:39 2025 +0800
[improve][admin] PIP-422 part 1: Support global topic-level replicated
clusters policy (#24390)
---
.../broker/admin/impl/PersistentTopicsBase.java | 54 ++++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 67 +++---
.../SystemTopicBasedTopicPoliciesService.java | 35 +++-
.../broker/service/TopicPoliciesService.java | 9 +
.../broker/service/persistent/PersistentTopic.java | 48 ++++-
.../pulsar/broker/admin/TopicPoliciesTest.java | 39 +++-
.../broker/service/OneWayReplicatorTest.java | 233 ++++++++++++++++++---
.../broker/service/OneWayReplicatorTestBase.java | 49 ++++-
...OneWayReplicatorUsingGlobalPartitionedTest.java | 61 +++++-
.../service/OneWayReplicatorUsingGlobalZKTest.java | 207 ++++++++++++++++++
.../apache/pulsar/client/admin/TopicPolicies.java | 10 +
.../client/admin/internal/TopicPoliciesImpl.java | 31 +++
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 65 ++++++
13 files changed, 813 insertions(+), 95 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0e8509db186..459d82cede8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3360,7 +3360,7 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- protected CompletableFuture<Void>
internalSetReplicationClusters(List<String> clusterIds) {
+ protected CompletableFuture<Void>
internalSetReplicationClusters(List<String> clusterIds, boolean isGlobal) {
if (CollectionUtils.isEmpty(clusterIds)) {
return CompletableFuture.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
"ClusterIds should not be null or empty"));
@@ -3373,6 +3373,47 @@ public class PersistentTopicsBase extends AdminResource {
"Cannot specify global in the list of
replication clusters");
}
})
+ .thenCompose(__ -> {
+ // Set a topic-level replicated clusters that do not
contain local cluster is not meaningful, except
+ // the following scenario: User has two clusters, which
enabled Geo-Replication through a global
+ // metadata store, the resources named partitioned topic
metadata and the resource namespace-level
+ // "replicated clusters" are shared between multi
clusters. Pulsar can hardly delete a specify
+ // partitioned topic. To support this use case, the
following steps can implement it:
+ // 1. set a global topic-level replicated clusters that do
not contain local cluster.
+ // 2. the local cluster will remove the subtopics
automatically, and remove the schemas and local
+ // topic policies. Just leave the global topic policies
there, which prevents the namespace level
+ // replicated clusters policy taking affect.
+ boolean clustersDoesNotContainsLocal =
CollectionUtils.isEmpty(clusterIds)
+ ||
!clusterIds.contains(pulsar().getConfig().getClusterName());
+ if (clustersDoesNotContainsLocal && !isGlobal) {
+ return FutureUtil.failedFuture(new
RestException(Response.Status.PRECONDITION_FAILED,
+ "Can not remove local cluster from the local
topic-level replication clusters policy"));
+ }
+ if (isGlobal) {
+ return
getNamespacePoliciesAsync(namespaceName).thenCompose(v -> {
+ // Since global policies depends on namespace
level replication, users only can set global
+ // policies when namespace level replication
exists. Otherwise, the policies will never be
+ // copied to the remote side, which is meaningless.
+ if (v == null || v.replication_clusters.size() <
2) {
+ return FutureUtil.failedFuture(new
RestException(Response.Status.PRECONDITION_FAILED,
+ "Please do not use the global topic level
policy when namespace-level replication"
+ + " is not enabled, because the global
level policy relies on namespace-level"
+ + " replication"));
+ }
+ for (String clusterId : clusterIds) {
+ if
(v.replication_clusters.contains(clusterId)) {
+ continue;
+ }
+ return FutureUtil.failedFuture(new
RestException(Response.Status.PRECONDITION_FAILED,
+ "The policies at the global topic level
will only be copied to the clusters"
+ + " included in the namespace level
replication. Therefore, please do not set the"
+ + " policies including other clusters"));
+ }
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+ return CompletableFuture.completedFuture(null);
+ })
.thenCompose(__ -> clustersAsync())
.thenCompose(clusters -> {
List<CompletableFuture<Void>> futures = new
ArrayList<>(replicationClusters.size());
@@ -3405,9 +3446,10 @@ public class PersistentTopicsBase extends AdminResource {
topicMetaOp.get().partitions).values());
});
}).thenCompose(__ ->
- getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op
-> {
+ getTopicPoliciesAsyncWithRetry(topicName,
isGlobal).thenCompose(op -> {
TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
topicPolicies.setReplicationClusters(clusterIds);
+ topicPolicies.setIsGlobal(isGlobal);
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies)
.thenRun(() -> {
log.info("[{}] Successfully set
replication clusters for namespace={}, "
@@ -3421,12 +3463,16 @@ public class PersistentTopicsBase extends AdminResource
{
));
}
- protected CompletableFuture<Void> internalRemoveReplicationClusters() {
+ protected CompletableFuture<Void>
internalRemoveReplicationClusters(boolean isGlobal) {
return validatePoliciesReadOnlyAccessAsync()
- .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName,
isGlobal))
.thenCompose(op -> {
+ if (op.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
topicPolicies.setReplicationClusters(null);
+ topicPolicies.setIsGlobal(isGlobal);
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies)
.thenRun(() -> {
log.info("[{}] Successfully set replication
clusters for namespace={}, "
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index ac38bd67ee7..c539dad7426 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2304,6 +2304,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false")
boolean isGlobal,
@QueryParam("applied") @DefaultValue("false")
boolean applied,
@ApiParam(value = "Whether leader broker
redirected this call to this broker. "
+ "For internal use.")
@@ -2311,21 +2312,48 @@ public class PersistentTopics extends
PersistentTopicsBase {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION,
PolicyOperation.READ)
.thenCompose(__ -> preValidation(authoritative))
- .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
- .thenAccept(op -> {
-
asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(()
-> {
- if (applied) {
- return
getNamespacePolicies(namespaceName).replication_clusters;
+ .thenCompose(__ -> {
+ if (applied) {
+ return getAppliedReplicatedClusters();
+ }
+ return getTopicPoliciesAsyncWithRetry(topicName,
isGlobal).thenApply(policy -> {
+ if (policy != null && policy.isPresent()
+ &&
CollectionUtils.isNotEmpty(policy.get().getReplicationClustersSet())) {
+ return policy.get().getReplicationClustersSet();
}
return null;
- }));
+ });
})
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getReplicationClusters", ex,
asyncResponse);
return null;
});
}
+ private CompletableFuture<Set<String>> getAppliedReplicatedClusters() {
+ return getTopicPoliciesAsyncWithRetry(topicName, false)
+ .thenCompose(localPolicy -> {
+ if (localPolicy != null && localPolicy.isPresent()
+ &&
CollectionUtils.isNotEmpty(localPolicy.get().getReplicationClustersSet())) {
+ return
CompletableFuture.completedFuture(localPolicy.get().getReplicationClustersSet());
+ }
+ return getTopicPoliciesAsyncWithRetry(topicName, true)
+ .thenCompose(globalPolicy -> {
+ if (globalPolicy != null && globalPolicy.isPresent()
+ &&
CollectionUtils.isNotEmpty(globalPolicy.get().getReplicationClustersSet())) {
+ return
CompletableFuture.completedFuture(globalPolicy.get().getReplicationClustersSet());
+ }
+ return
getNamespacePoliciesAsync(namespaceName).thenApply(v -> {
+ if (v != null) {
+ return v.replication_clusters;
+ }
+ return null;
+ });
+ });
+ });
+ }
+
@POST
@Path("/{tenant}/{namespace}/{topic}/replication")
@ApiOperation(value = "Set the replication clusters for a topic.")
@@ -2341,32 +2369,14 @@ public class PersistentTopics extends
PersistentTopicsBase {
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String
namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "List of replication clusters", required = true)
List<String> clusterIds) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION,
PolicyOperation.WRITE)
- .thenCompose(__ ->
preValidation(authoritative)).thenCompose(__ -> {
- // Set a topic-level replicated clusters that do not
contain local cluster is not meaningful, except
- // the following scenario: User has two clusters, which
enabled Geo-Replication through a global
- // metadata store, the resources named partitioned topic
metadata and the resource namespace-level
- // "replicated clusters" are shared between multi
clusters. Pulsar can hardly delete a specify
- // partitioned topic. To support this use case, the
following steps can implement it:
- // 1. set a global topic-level replicated clusters that do
not contain local cluster.
- // 2. the local cluster will remove the subtopics
automatically, and remove the schemas and local
- // topic policies. Just leave the global topic policies
there, which prevents the namespace level
- // replicated clusters policy taking affect.
- // TODO But the API "pulsar-admin topics
set-replication-clusters" does not support global policy,
- // to support this scenario, a PIP is needed.
- boolean clustersDoesNotContainsLocal =
CollectionUtils.isEmpty(clusterIds)
- ||
!clusterIds.contains(pulsar().getConfig().getClusterName());
- if (clustersDoesNotContainsLocal) {
- return FutureUtil.failedFuture(new
RestException(Response.Status.PRECONDITION_FAILED,
- "Can not remove local cluster from the topic-level
replication clusters policy"));
- }
- return CompletableFuture.completedFuture(null);
- })
- .thenCompose(__ -> internalSetReplicationClusters(clusterIds))
+ .thenCompose(__ -> preValidation(authoritative))
+ .thenCompose(__ -> internalSetReplicationClusters(clusterIds,
isGlobal))
.thenRun(() ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setReplicationClusters", ex,
asyncResponse);
@@ -2387,12 +2397,13 @@ public class PersistentTopics extends
PersistentTopicsBase {
public void removeReplicationClusters(@Suspended final AsyncResponse
asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String
namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION,
PolicyOperation.WRITE)
.thenCompose(__ -> preValidation(authoritative))
- .thenCompose(__ -> internalRemoveReplicationClusters())
+ .thenCompose(__ -> internalRemoveReplicationClusters(isGlobal))
.thenRun(() ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("removeReplicationClusters",
ex, asyncResponse);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 4b833502095..4e858fc91a1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -139,6 +139,17 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
@Override
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName
topicName) {
+ return deleteTopicPoliciesAsync(topicName, false);
+ }
+
+ /**
+ * @param keepGlobalPolicies only be used when a topic was deleted because
users removes current
+ * cluster from the policy "replicatedClusters".
+ * See also https://github.com/apache/pulsar/blob/master/pip/pip-422.md
+ */
+ @Override
+ public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName
topicName,
+ boolean
keepGlobalPolicies) {
if
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject()) ||
isSelf(topicName)) {
return CompletableFuture.completedFuture(null);
}
@@ -167,7 +178,8 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
log.info("Skip delete topic-level policies because {} has been
removed before", changeEvents);
return CompletableFuture.completedFuture(null);
}
- return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
+ return sendTopicPolicyEvent(topicName, ActionType.DELETE, null,
+ keepGlobalPolicies);
});
}
@@ -177,11 +189,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
return CompletableFuture.failedFuture(new
BrokerServiceException.NotAllowedException(
"Not allowed to update topic policy for the heartbeat
topic"));
}
- return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies);
+ return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies,
false);
}
private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName,
ActionType actionType,
- @Nullable
TopicPolicies policies) {
+ @Nullable TopicPolicies policies, boolean
keepGlobalPoliciesAfterDeleting) {
return pulsarService.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(namespacePolicies -> {
@@ -203,7 +215,8 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
result.completeExceptionally(cause);
} else {
CompletableFuture<MessageId> writeFuture =
-
sendTopicPolicyEventInternal(topicName, actionType, writer, policies);
+
sendTopicPolicyEventInternal(topicName, actionType, writer, policies,
+
keepGlobalPoliciesAfterDeleting);
writeFuture.whenComplete((messageId, e) ->
{
if (e != null) {
result.completeExceptionally(e);
@@ -223,14 +236,20 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
private CompletableFuture<MessageId>
sendTopicPolicyEventInternal(TopicName topicName, ActionType actionType,
- SystemTopicClient.Writer<PulsarEvent>
writer,
- @Nullable TopicPolicies policies) {
+ SystemTopicClient.Writer<PulsarEvent> writer, @Nullable
TopicPolicies policies,
+ boolean keepGlobalPoliciesAfterDeleting) {
PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
if (!ActionType.DELETE.equals(actionType)) {
return writer.writeAsync(getEventKey(event, policies != null &&
policies.isGlobalPolicies()), event);
}
// When a topic is deleting, delete both non-global and global
topic-level policies.
- CompletableFuture<MessageId> deletePolicies =
writer.deleteAsync(getEventKey(event, true), event)
+ CompletableFuture<MessageId> dealWithGlobalPolicy;
+ if (keepGlobalPoliciesAfterDeleting) {
+ dealWithGlobalPolicy = CompletableFuture.completedFuture(null);
+ } else {
+ dealWithGlobalPolicy = writer.deleteAsync(getEventKey(event,
true), event);
+ }
+ CompletableFuture<MessageId> deletePolicies = dealWithGlobalPolicy
.thenCompose(__ -> {
return writer.deleteAsync(getEventKey(event, false), event);
});
@@ -620,7 +639,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
SystemTopicClient<PulsarEvent> systemTopicClient =
getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newWriterAsync().thenAccept(writer -> {
- sendTopicPolicyEventInternal(topicName,
ActionType.DELETE, writer, event.getPolicies())
+ sendTopicPolicyEventInternal(topicName,
ActionType.DELETE, writer, event.getPolicies(), false)
.whenComplete((result, e) -> writer.closeAsync()
.whenComplete((res, ex) -> {
if (ex != null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index ec5da7995bf..02303fd9c4a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -28,6 +28,8 @@ import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Topic policies service.
@@ -38,6 +40,8 @@ public interface TopicPoliciesService extends AutoCloseable {
String GLOBAL_POLICIES_MSG_KEY_PREFIX = "__G__";
+ Logger LOG = LoggerFactory.getLogger(TopicPoliciesService.class);
+
TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
/**
@@ -47,6 +51,11 @@ public interface TopicPoliciesService extends AutoCloseable {
*/
CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName);
+ default CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName
topicName,
+ boolean
keepGlobalPoliciesAfterDeleting) {
+ return deleteTopicPoliciesAsync(topicName);
+ }
+
/**
* Update policies for a topic asynchronously.
*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5602cd67951..d8cbc0cb453 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1929,7 +1929,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return CompletableFuture.completedFuture(null);
}
- String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
+ final String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
return checkAllowedCluster(localCluster).thenCompose(success -> {
if (!success) {
@@ -1980,7 +1980,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
});
}
+ /**
+ * There are only one cases that will remove local clusters: using global
metadata store, which means that
+ * namespaces will share policies cross multi clusters, including
"replicated clusters" and "partitioned topic
+ * metadata", we can hardly delete partitioned topic from one cluster and
keep it exists in another.
+ * Users removes local cluster "replicated clusters" to delete topic from
one of clusters.
+ */
CompletableFuture<Void> deleteSchemaAndPoliciesIfClusterRemoved() {
+ final String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
TopicName tName = TopicName.get(topic);
if (!tName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
@@ -2018,15 +2025,36 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
});
- // There are only one cases that will
remove local clusters: using global metadata
- // store, namespaces will share policies
cross multi clusters, including
- // "replicated clusters" and "partitioned
topic metadata", we can hardly delete
- // partitioned topic from one cluster and
keep it exists in another. Removing
- // local cluster from the namespace level
"replicated clusters" can do this.
- // TODO: there is no way to delete a
specify partitioned topic if users have enabled
- // Geo-Replication with a global metadata
store, a PIP is needed.
- // Since the system topic
"__change_events" under the namespace will also be
- // deleted, we can skip to delete
topic-level policies.
+
+ // Two cases that can run up to hereļ¼
+ // 1. Namespace level removing local
cluster: all topic policies will be removed
+ // when the system topic "__change_events"
is deleting.
+ // 2. Global topic level removing local
cluster: we need to remove local topic-level
+ // policies here, but leave global
topic-level policies there to avoid the namespace
+ // level policies take effect, whose
policies still contains local cluster.
+ boolean changeEventsAlsoBeingDeleted =
!topicPolicies.getReplicationClusters()
+
.getNamespaceValue().contains(localCluster);
+ if (changeEventsAlsoBeingDeleted) {
+ log.info("Skip to deleted topic
policies[{}] after all partitions[{}] were"
+ + " removed because the system
topic __change_events will be removed.",
+ partitionedName,
metadataOp.get().partitions);
+ return;
+ }
+
brokerService.getPulsar().getTopicPoliciesService()
+
.deleteTopicPoliciesAsync(partitionedName, true)
+ .whenComplete((__, ex) -> {
+ if (ex == null) {
+ log.info("Deleted topic
policies[{}] after all partitions[{}] were"
+ + " removed because the
current cluster has bee removed from"
+ + " topic policies. Global
policies will not be deleted.",
+ partitionedName,
metadataOp.get().partitions);
+ } else {
+ log.error("Failed to delete
topic policies[{}] after all partitions[{}]"
+ + " were removed, when
the current cluster has bee removed from"
+ + " topic policies",
+ partitionedName,
metadataOp.get().partitions, ex);
+ }
+ });
}
}
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 08c86f62de3..1d911c52c83 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
@@ -4070,19 +4071,51 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
@Test(dataProvider = "topicTypes")
public void testRemoveLocalCluster(TopicType topicType) throws Exception {
- String topic = "persistent://" + myNamespace +
"/testSetSubRateWithSub";
+ final String cluster = pulsar.getConfig().getClusterName();
+ final String topic = "persistent://" + myNamespace +
"/testSetSubRateWithSub";
if (TopicType.PARTITIONED.equals(topicType)) {
admin.topics().createNonPartitionedTopic(topic);
} else {
admin.topics().createPartitionedTopic(topic, 2);
}
+ // Can not remove current cluster by local topic-level policies.
try {
admin.topics().setReplicationClusters(topic,
Arrays.asList("not-local-cluster"));
fail("Can not remove local cluster from the topic-level
replication clusters policy");
} catch (PulsarAdminException.PreconditionFailedException e) {
- assertTrue(e.getMessage().contains("Can not remove local cluster
from the topic-level replication clusters"
- + " policy"));
+ assertTrue(e.getMessage().contains("Can not remove local cluster
from the local topic-level replication"
+ + " clusters policy"));
}
+ try {
+ admin.topicPolicies(false).setReplicationClusters(topic,
Arrays.asList("not-local-cluster")).get();
+ fail("Can not remove local cluster from the topic-level
replication clusters policy");
+ } catch (Exception e) {
+ Throwable actEx = FutureUtil.unwrapCompletionException(e);
+ assertTrue(actEx.getMessage().contains("Can not remove local
cluster from the local topic-level replication"
+ + " clusters policy"));
+ }
+ // Can not use the global topic level policy when namespace-level
replication is not enabled.
+ try {
+ admin.topicPolicies(true).setReplicationClusters(topic,
Arrays.asList("not-local-cluster")).get();
+ fail("Please do not use the global topic level policy when
namespace-level replication is not enabled,"
+ + " because the global level policy relies on namespace-level
replication");
+ } catch (Exception e) {
+ Throwable actEx = FutureUtil.unwrapCompletionException(e);
+ assertTrue(actEx.getMessage().contains("namespace-level
replication"));
+ }
+ // Can not set global topic level clusters that does not exist in
namespace-level replication.
+ admin.namespaces().setNamespaceReplicationClusters(myNamespace,
Collections.singleton(cluster));
+ try {
+ admin.topicPolicies(true)
+ .setReplicationClusters(topic,
Arrays.asList("not-local-cluster", cluster)).get();
+ fail("The policies at the global topic level will only be copied
to the clusters included in the namespace"
+ + " level replication. Therefore, please do not set the
policies at the global topic level to"
+ + " other clusters");
+ } catch (Exception e) {
+ Throwable actEx = FutureUtil.unwrapCompletionException(e);
+ assertTrue(actEx.getMessage().contains("namespace-level
replication"));
+ }
+
// cleanup.
if (TopicType.PARTITIONED.equals(topicType)) {
admin.topics().delete(topic, false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index b427adbc550..de6c5f9f356 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -70,6 +70,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
@@ -138,34 +139,6 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
super.cleanup();
}
- private void waitReplicatorStopped(String topicName) {
- Awaitility.await().untilAsserted(() -> {
- Optional<Topic> topicOptional2 =
pulsar2.getBrokerService().getTopic(topicName, false).get();
- assertTrue(topicOptional2.isPresent());
- PersistentTopic persistentTopic2 = (PersistentTopic)
topicOptional2.get();
- assertTrue(persistentTopic2.getProducers().isEmpty());
- Optional<Topic> topicOptional1 =
pulsar2.getBrokerService().getTopic(topicName, false).get();
- assertTrue(topicOptional1.isPresent());
- PersistentTopic persistentTopic1 = (PersistentTopic)
topicOptional2.get();
- assertTrue(persistentTopic1.getReplicators().isEmpty()
- ||
!persistentTopic1.getReplicators().get(cluster2).isConnected());
- });
- }
-
- /**
- * Override "AbstractReplicator.producer" by {@param producer} and return
the original value.
- */
- private ProducerImpl overrideProducerForReplicator(AbstractReplicator
replicator, ProducerImpl newProducer)
- throws Exception {
- Field producerField =
AbstractReplicator.class.getDeclaredField("producer");
- producerField.setAccessible(true);
- ProducerImpl originalValue = (ProducerImpl)
producerField.get(replicator);
- synchronized (replicator) {
- producerField.set(replicator, newProducer);
- }
- return originalValue;
- }
-
@Test(timeOut = 45 * 1000)
public void testReplicatorProducerStatInTopic() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
@@ -224,6 +197,198 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
});
}
+ @Test
+ public void testDeleteRemoteTopicByGlobalPolicy() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_123");
+ final String subTopic =
TopicName.get(topicName).getPartition(0).toString();
+ admin1.topics().createPartitionedTopic(topicName, 1);
+ Producer<byte[]> producer1 =
client1.newProducer().topic(topicName).create();
+ producer1.close();
+ waitReplicatorStarted(subTopic, pulsar2);
+ Set<String> clustersApplied =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied.contains(cluster1));
+ assertTrue(clustersApplied.contains(cluster2));
+
+ // Remove topic from a cluster.
+ admin1.topicPolicies(true).setReplicationClusters(topicName,
Arrays.asList(cluster1));
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> clustersApplied1 =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied1.contains(cluster1));
+ assertFalse(clustersApplied1.contains(cluster2));
+ Set<String> clustersApplied2 =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied2.contains(cluster1));
+ assertFalse(clustersApplied2.contains(cluster2));
+
+ Set<String> local1 =
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(local1));
+ Set<String> local2 =
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(local2));
+
+ Set<String> global1 =
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertNotNull(global1);
+ assertTrue(global1.contains(cluster1));
+ assertFalse(global1.contains(cluster2));
+
+ Set<String> global2 =
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertNotNull(global2);
+ assertTrue(global2.contains(cluster1));
+ assertFalse(global2.contains(cluster2));
+ });
+ waitReplicatorStopped(subTopic, true);
+
+ // Remove global policy.
+ admin1.topicPolicies(true).removeReplicationClusters(topicName);
+ Producer<byte[]> producer2 =
client1.newProducer().topic(topicName).create();
+ producer2.close();
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> clustersApplied1 =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied1.contains(cluster1));
+ assertTrue(clustersApplied1.contains(cluster2));
+ Set<String> clustersApplied2 =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertFalse(clustersApplied2.contains(cluster1));
+ assertTrue(clustersApplied2.contains(cluster2));
+
+ Set<String> clusters1 =
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(clusters1));
+ Set<String> clusters2 =
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(clusters2));
+ });
+ waitReplicatorStarted(subTopic, pulsar2);
+
+ admin1.topics().unload(subTopic);
+ admin2.topics().unload(subTopic);
+ }
+
+ /**
+ * Test: policies overwrite and applied policies.
+ */
+ @Test
+ public void testPoliciesOverWrite() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_123");
+ final String subTopic =
TopicName.get(topicName).getPartition(0).toString();
+ admin1.topics().createPartitionedTopic(topicName, 1);
+ Producer<byte[]> producer1 =
client1.newProducer().topic(topicName).create();
+ producer1.close();
+ waitReplicatorStarted(subTopic, pulsar2);
+ Set<String> clustersApplied1 =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied1.contains(cluster1));
+ assertTrue(clustersApplied1.contains(cluster2));
+ // Set clusters for cluster2 to avoid topic deleting. This feature is
needed for the following situation,
+ // - There are 3 clusters using shared metadata store
+ // - The user want to delete topic on the cluster "c2", and to stop
replication on the cluster "c3 -> c1"
+ // - The user will do the following configurations
+ // - Set a global policy: [c1, c3].
+ // - Set a local policy for the cluster "c3": [c3].
+ Awaitility.await().untilAsserted(() -> {
+ admin2.topics().setReplicationClusters(topicName,
Arrays.asList(cluster2));
+ Set<String> clustersApplied2 =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertFalse(clustersApplied2.contains(cluster1));
+ assertTrue(clustersApplied2.contains(cluster2));
+ });
+
+ // Cluster1: Global policy overwrite namespace policy.
+ // Cluster2: Global policy never overwrite namespace policy.
+ admin1.topicPolicies(true).setReplicationClusters(topicName,
Arrays.asList(cluster1));
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> clustersApplied10 =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied10.contains(cluster1));
+ assertFalse(clustersApplied10.contains(cluster2));
+ Set<String> clustersApplied20 =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertFalse(clustersApplied20.contains(cluster1));
+ assertTrue(clustersApplied20.contains(cluster2));
+
+ Set<String> local1 =
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(local1));
+ Set<String> local2 =
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isNotEmpty(local2));
+ assertTrue(local2.contains(cluster2));
+
+ Set<String> global1 =
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertNotNull(global1);
+ assertTrue(global1.contains(cluster1));
+ assertFalse(global1.contains(cluster2));
+ Set<String> global2 =
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertNotNull(global2);
+ assertTrue(global2.contains(cluster1));
+ assertFalse(global2.contains(cluster2));
+ });
+ waitReplicatorStopped(subTopic, false);
+
+ // Remove global policy.
+ admin1.topicPolicies(true).removeReplicationClusters(topicName);
+ Producer<byte[]> producer2 =
client1.newProducer().topic(topicName).create();
+ producer2.close();
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> clustersApplied10 =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied10.contains(cluster1));
+ assertTrue(clustersApplied10.contains(cluster2));
+ Set<String> clustersApplied20 =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertFalse(clustersApplied20.contains(cluster1));
+ assertTrue(clustersApplied20.contains(cluster2));
+
+ Set<String> local2 =
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isNotEmpty(local2));
+ assertTrue(local2.contains(cluster2));
+
+ Set<String> global1 =
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(global1));
+ Set<String> global2 =
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(global2));
+ });
+ waitReplicatorStarted(subTopic, pulsar2);
+
+ // Cluster1: Local policy overwrite namespace policy.
+ // Cluster2: Global policy never overwrite namespace policy.
+ admin1.topicPolicies(false).setReplicationClusters(topicName,
Arrays.asList(cluster1));
+ Producer<byte[]> producer3 =
client1.newProducer().topic(topicName).create();
+ producer3.close();
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> clustersApplied10 =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied10.contains(cluster1));
+ assertFalse(clustersApplied10.contains(cluster2));
+ Set<String> clustersApplied20 =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertFalse(clustersApplied20.contains(cluster1));
+ assertTrue(clustersApplied20.contains(cluster2));
+
+ Set<String> global1 =
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(global1));
+ Set<String> global2 =
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(global2));
+
+ Set<String> local1 =
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isNotEmpty(local1));
+ assertTrue(local1.contains(cluster1));
+ assertFalse(local1.contains(cluster2));
+
+ Set<String> local2 =
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isNotEmpty(local2));
+ assertTrue(local2.contains(cluster2));
+ assertFalse(local2.contains(cluster1));
+ });
+ waitReplicatorStopped(subTopic, false);
+
+ // Remove local policy.
+ admin1.topicPolicies(false).removeReplicationClusters(topicName);
+ Producer<byte[]> producer4 =
client1.newProducer().topic(topicName).create();
+ producer4.close();
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> local1 =
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(local1));
+ Set<String> local2 =
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isNotEmpty(local2));
+ assertTrue(local2.contains(cluster2));
+
+ Set<String> global1 =
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(global1));
+ Set<String> global2 =
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(global2));
+ });
+ waitReplicatorStarted(subTopic, pulsar2);
+
+ admin1.topics().unload(subTopic);
+ admin2.topics().unload(subTopic);
+ }
+
@Test(timeOut = 45 * 1000)
public void testCreateRemoteConsumerFirst() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
@@ -549,8 +714,8 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
assertEquals(topicMetadata2.partitions, 2);
// cleanup.
admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1));
- waitReplicatorStopped(partition0);
- waitReplicatorStopped(partition1);
+ waitReplicatorStopped(partition0, false);
+ waitReplicatorStopped(partition1, false);
admin1.topics().deletePartitionedTopic(topicName);
admin2.topics().deletePartitionedTopic(topicName);
}
@@ -600,8 +765,8 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
});
// cleanup.
admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1));
- waitReplicatorStopped(partition0);
- waitReplicatorStopped(partition1);
+ waitReplicatorStopped(partition0, false);
+ waitReplicatorStopped(partition1, false);
admin1.topics().deletePartitionedTopic(topicName);
admin2.topics().deletePartitionedTopic(topicName);
}
@@ -1325,7 +1490,7 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
// cleanup.
taskToClearInjection.run();
admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1));
- waitReplicatorStopped(topicName);
+ waitReplicatorStopped(topicName, false);
admin1.topics().delete(topicName, false);
admin2.topics().delete(topicName, false);
}
@@ -1637,7 +1802,7 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
// cleanup.
producer1.close();
admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1));
- waitReplicatorStopped(topicName);
+ waitReplicatorStopped(topicName, false);
admin1.topics().delete(topicName, false);
if (originalReplClient2 == null) {
pulsar1.getBrokerService().getReplicationClients().remove(cluster2);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 82c6267c153..58d8bcf7045 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
import java.net.URL;
import java.time.Duration;
import java.util.Arrays;
@@ -52,6 +53,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -371,14 +373,59 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
}
protected void waitReplicatorStarted(String topicName) {
+ waitReplicatorStarted(topicName, pulsar2);
+ }
+
+ protected void waitReplicatorStarted(String topicName, PulsarService
remoteCluster) {
Awaitility.await().untilAsserted(() -> {
- Optional<Topic> topicOptional2 =
pulsar2.getBrokerService().getTopic(topicName, false).get();
+ Optional<Topic> topicOptional2 =
remoteCluster.getBrokerService().getTopic(topicName, false).get();
assertTrue(topicOptional2.isPresent());
PersistentTopic persistentTopic2 = (PersistentTopic)
topicOptional2.get();
assertFalse(persistentTopic2.getProducers().isEmpty());
});
}
+ protected void waitReplicatorStopped(String topicName, boolean
remoteTopicExpectedDeleted) {
+ waitReplicatorStopped(topicName, pulsar1, pulsar2,
remoteTopicExpectedDeleted);
+ }
+
+ protected void waitReplicatorStopped(String topicName, PulsarService
sourceCluster, PulsarService remoteCluster,
+ boolean remoteTopicExpectedDeleted) {
+ Awaitility.await().untilAsserted(() -> {
+ Optional<Topic> remoteTp =
remoteCluster.getBrokerService().getTopic(topicName, false).get();
+ if (remoteTopicExpectedDeleted) {
+ assertTrue(remoteTp.isEmpty());
+ } else {
+ assertTrue(remoteTp.isPresent());
+ }
+ if (remoteTp.isPresent()) {
+ PersistentTopic remoteTp1 = (PersistentTopic) remoteTp.get();
+ for (org.apache.pulsar.broker.service.Producer p :
remoteTp1.getProducers().values()) {
+
assertFalse(p.getProducerName().startsWith(remoteCluster.getConfig().getReplicatorPrefix()));
+ }
+ }
+ Optional<Topic> sourceTp =
sourceCluster.getBrokerService().getTopic(topicName, false).get();
+ assertTrue(sourceTp.isPresent());
+ PersistentTopic sourceTp1 = (PersistentTopic) sourceTp.get();
+ assertTrue(sourceTp1.getReplicators().isEmpty()
+ ||
!sourceTp1.getReplicators().get(remoteCluster.getConfig().getClusterName()).isConnected());
+ });
+ }
+
+ /**
+ * Override "AbstractReplicator.producer" by {@param producer} and return
the original value.
+ */
+ protected ProducerImpl overrideProducerForReplicator(AbstractReplicator
replicator, ProducerImpl newProducer)
+ throws Exception {
+ Field producerField =
AbstractReplicator.class.getDeclaredField("producer");
+ producerField.setAccessible(true);
+ ProducerImpl originalValue = (ProducerImpl)
producerField.get(replicator);
+ synchronized (replicator) {
+ producerField.set(replicator, newProducer);
+ }
+ return originalValue;
+ }
+
protected PulsarClient initClient(ClientBuilder clientBuilder) throws
Exception {
return clientBuilder.build();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index 2030a22d7e0..a0099f97fea 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@@ -78,6 +79,18 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
super.testReplicatorProducerStatInTopic();
}
+ @Override
+ @Test(enabled = false)
+ public void testDeleteRemoteTopicByGlobalPolicy() throws Exception {
+ super.testDeleteRemoteTopicByGlobalPolicy();
+ }
+
+ @Override
+ @Test(enabled = false)
+ public void testPoliciesOverWrite() throws Exception {
+ super.testPoliciesOverWrite();
+ }
+
@Override
@Test(enabled = false)
public void testCreateRemoteConsumerFirst() throws Exception {
@@ -180,8 +193,16 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
super.testNonPersistentReplicatorQueueSize();
}
- @Test(timeOut = 60_000)
- public void testRemoveCluster() throws Exception {
+ @DataProvider
+ public Object[][] removeClusterLevels() {
+ return new Object[][] {
+ {"namespace"},
+ {"topic"}
+ };
+ }
+
+ @Test(timeOut = 60_000, dataProvider = "removeClusterLevels")
+ public void testRemoveCluster(String removeClusterLevel) throws Exception {
// Initialize.
final String ns1 = defaultTenant + "/" +
"ns_73b1a31afce34671a5ddc48fe5ad7fc8";
final String topic = "persistent://" + ns1 +
"/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a";
@@ -235,15 +256,27 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
// The topics under the namespace of the cluster-1 will be deleted.
// Verify the result.
- admin1.namespaces().setNamespaceReplicationClusters(ns1, new
HashSet<>(Arrays.asList(cluster2)));
+ if ("namespace".equals(removeClusterLevel)) {
+ admin1.namespaces().setNamespaceReplicationClusters(ns1, new
HashSet<>(Arrays.asList(cluster2)));
+ } else {
+ admin1.topicPolicies(true).setReplicationClusters(topic,
Arrays.asList(cluster2));
+ admin2.topicPolicies(true).setReplicationClusters(topic,
Arrays.asList(cluster2));
+ }
Awaitility.await().atMost(Duration.ofSeconds(60)).ignoreExceptions().untilAsserted(()
-> {
Map<String, CompletableFuture<Optional<Topic>>> tps =
pulsar1.getBrokerService().getTopics();
assertFalse(tps.containsKey(topicP0));
assertFalse(tps.containsKey(topicP1));
- assertFalse(tps.containsKey(topicChangeEvents));
- assertFalse(pulsar1.getNamespaceService()
- .checkTopicExistsAsync(TopicName.get(topicChangeEvents))
- .get(5, TimeUnit.SECONDS).isExists());
+ if ("namespace".equals(removeClusterLevel)) {
+ assertFalse(tps.containsKey(topicChangeEvents));
+ assertFalse(pulsar1.getNamespaceService()
+
.checkTopicExistsAsync(TopicName.get(topicChangeEvents))
+ .get(5, TimeUnit.SECONDS).isExists());
+ } else {
+ assertTrue(tps.containsKey(topicChangeEvents));
+ assertTrue(pulsar1.getNamespaceService()
+
.checkTopicExistsAsync(TopicName.get(topicChangeEvents))
+ .get(5, TimeUnit.SECONDS).isExists());
+ }
// Verify: schema will be removed in local cluster, and remote
cluster will not.
List<CompletableFuture<StoredSchema>> schemaList13 =
pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
@@ -252,6 +285,16 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
assertEquals(schemaList23.size(), 1);
// Verify: the topic policies will be removed in local cluster,
but remote cluster will not.
+ if ("topic".equals(removeClusterLevel)) {
+ Optional<TopicPolicies> localPolicies1 =
pulsar1.getTopicPoliciesService()
+ .getTopicPoliciesAsync(TopicName.get(topic),
LOCAL_ONLY).join();
+ assertTrue(localPolicies1.isEmpty(), "Local cluster should
have deleted local policies.");
+ Optional<TopicPolicies> globalPolicies1 =
pulsar1.getTopicPoliciesService()
+ .getTopicPoliciesAsync(TopicName.get(topic),
GLOBAL_ONLY).join();
+ assertTrue(globalPolicies1.isPresent(), "Local cluster should
have global policies.");
+ assertEquals(globalPolicies1.get().getPublishRate(),
publishRateAddGlobal,
+ "Remote cluster should have global policies: publish
rate.");
+ }
Optional<TopicPolicies> globalPolicies2 =
pulsar2.getTopicPoliciesService()
.getTopicPoliciesAsync(TopicName.get(topic),
GLOBAL_ONLY).join();
assertTrue(globalPolicies2.isPresent(), "Remote cluster should
have global policies.");
@@ -265,7 +308,11 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
});
// cleanup.
+ if ("topic".equals(removeClusterLevel)) {
+ admin1.namespaces().setNamespaceReplicationClusters(ns1, new
HashSet<>(Arrays.asList(cluster2)));
+ }
admin2.topics().deletePartitionedTopic(topic);
+ assertEquals(admin2.topics().getList(ns1).size(), 0);
admin2.namespaces().deleteNamespace(ns1);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 49676dc8a08..e14cc5045d6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -27,10 +27,12 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -65,6 +67,211 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
super.testReplicatorProducerStatInTopic();
}
+ @Override
+ @Test
+ public void testDeleteRemoteTopicByGlobalPolicy() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_123");
+ final String subTopic =
TopicName.get(topicName).getPartition(0).toString();
+ admin1.topics().createPartitionedTopic(topicName, 1);
+ Producer<byte[]> producer1 =
client1.newProducer().topic(topicName).create();
+ producer1.close();
+ waitReplicatorStarted(subTopic, pulsar2);
+ waitReplicatorStarted(subTopic, pulsar1);
+ Set<String> clustersApplied1 =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied1.contains(cluster1));
+ assertTrue(clustersApplied1.contains(cluster2));
+ Set<String> clustersApplied2 =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied2.contains(cluster1));
+ assertTrue(clustersApplied2.contains(cluster2));
+
+ // Remove topic from a cluster.
+ admin1.topicPolicies(true).setReplicationClusters(topicName,
Arrays.asList(cluster1));
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> clustersApplied1a =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied1a.contains(cluster1));
+ assertFalse(clustersApplied1a.contains(cluster2));
+ Set<String> clustersApplied2a =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied2a.contains(cluster1));
+ assertFalse(clustersApplied2a.contains(cluster2));
+
+ Set<String> local1 =
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(local1));
+ Set<String> local2 =
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(local2));
+
+ Set<String> global1 =
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertNotNull(global1);
+ assertTrue(global1.contains(cluster1));
+ assertFalse(global1.contains(cluster2));
+
+ Set<String> global2 =
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertNotNull(global2);
+ assertTrue(global2.contains(cluster1));
+ assertFalse(global2.contains(cluster2));
+ });
+ waitReplicatorStopped(subTopic, pulsar1, pulsar2, true);
+
+ // Remove global policy.
+ admin1.topicPolicies(true).removeReplicationClusters(topicName);
+ Producer<byte[]> producer2 =
client1.newProducer().topic(topicName).create();
+ producer2.close();
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> clustersApplied1a =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied1a.contains(cluster1));
+ assertTrue(clustersApplied1a.contains(cluster2));
+ Set<String> clustersApplied2a =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied2a.contains(cluster1));
+ assertTrue(clustersApplied2a.contains(cluster2));
+
+ Set<String> clusters1 =
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(clusters1));
+ Set<String> clusters2 =
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(clusters2));
+ });
+ waitReplicatorStarted(subTopic, pulsar2);
+ waitReplicatorStarted(subTopic, pulsar1);
+
+ admin1.topics().unload(subTopic);
+ admin2.topics().unload(subTopic);
+ }
+
+ @Override
+ @Test
+ public void testPoliciesOverWrite() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_123");
+ final String subTopic =
TopicName.get(topicName).getPartition(0).toString();
+ admin1.topics().createPartitionedTopic(topicName, 1);
+ Producer<byte[]> producer1 =
client1.newProducer().topic(topicName).create();
+ producer1.close();
+ waitReplicatorStarted(subTopic, pulsar2);
+ Set<String> clustersApplied1 =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied1.contains(cluster1));
+ assertTrue(clustersApplied1.contains(cluster2));
+ // Set clusters for cluster2 to avoid topic deleting. This feature is
needed for the following situation,
+ // - There are 3 clusters using shared metadata store
+ // - The user want to delete topic on the cluster "c2", and to stop
replication on the cluster "c3 -> c1"
+ // - The user will do the following configurations
+ // - Set a global policy: [c1, c3].
+ // - Set a local policy for the cluster "c3": [c3].
+ admin2.topics().setReplicationClusters(topicName,
Arrays.asList(cluster2));
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> clustersApplied2 =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertFalse(clustersApplied2.contains(cluster1));
+ assertTrue(clustersApplied2.contains(cluster2));
+ });
+
+
+ // Cluster1: Global policy overwrite namespace policy.
+ // Cluster2: Global policy never overwrite namespace policy.
+ admin1.topicPolicies(true).setReplicationClusters(topicName,
Arrays.asList(cluster1));
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> clustersApplied1a =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied1a.contains(cluster1));
+ assertFalse(clustersApplied1a.contains(cluster2));
+ Set<String> clustersApplied2a =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertFalse(clustersApplied2a.contains(cluster1));
+ assertTrue(clustersApplied2a.contains(cluster2));
+
+ Set<String> local1 =
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(local1));
+ Set<String> local2 =
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isNotEmpty(local2));
+ assertTrue(local2.contains(cluster2));
+
+ Set<String> global1 =
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertNotNull(global1);
+ assertTrue(global1.contains(cluster1));
+ assertFalse(global1.contains(cluster2));
+ Set<String> global2 =
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertNotNull(global2);
+ assertTrue(global2.contains(cluster1));
+ assertFalse(global2.contains(cluster2));
+ });
+ waitReplicatorStopped(subTopic, pulsar1, pulsar2, false);
+ waitReplicatorStopped(subTopic, pulsar2, pulsar1, false);
+
+ // Remove global policy.
+ admin1.topicPolicies(true).removeReplicationClusters(topicName);
+ Producer<byte[]> producer2 =
client1.newProducer().topic(topicName).create();
+ producer2.close();
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> clustersApplied1a =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied1a.contains(cluster1));
+ assertTrue(clustersApplied1a.contains(cluster2));
+ Set<String> clustersApplied2a =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertFalse(clustersApplied2a.contains(cluster1));
+ assertTrue(clustersApplied2a.contains(cluster2));
+
+ Set<String> local2 =
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isNotEmpty(local2));
+ assertTrue(local2.contains(cluster2));
+
+ Set<String> global1 =
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(global1));
+ Set<String> global2 =
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(global2));
+ });
+ waitReplicatorStarted(subTopic, pulsar2);
+
+ // Cluster1: Local policy overwrite namespace policy.
+ // Cluster2: Global policy never overwrite namespace policy.
+ admin1.topicPolicies(false).setReplicationClusters(topicName,
Arrays.asList(cluster1));
+ Producer<byte[]> producer3 =
client1.newProducer().topic(topicName).create();
+ producer3.close();
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> clustersApplied1a =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied1a.contains(cluster1));
+ assertFalse(clustersApplied1a.contains(cluster2));
+ Set<String> clustersApplied2a =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertFalse(clustersApplied2a.contains(cluster1));
+ assertTrue(clustersApplied2a.contains(cluster2));
+
+ Set<String> global1 =
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(global1));
+ Set<String> global2 =
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(global2));
+
+ Set<String> local1 =
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isNotEmpty(local1));
+ assertTrue(local1.contains(cluster1));
+ assertFalse(local1.contains(cluster2));
+
+ Set<String> local2 =
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isNotEmpty(local2));
+ assertTrue(local2.contains(cluster2));
+ assertFalse(local2.contains(cluster1));
+ });
+ waitReplicatorStopped(subTopic, false);
+
+ // Remove local policy.
+ admin1.topicPolicies(false).removeReplicationClusters(topicName);
+ Producer<byte[]> producer4 =
client1.newProducer().topic(topicName).create();
+ producer4.close();
+ Awaitility.await().untilAsserted(() -> {
+ Set<String> clustersApplied1a =
admin1.topicPolicies().getReplicationClusters(topicName, true);
+ assertTrue(clustersApplied1a.contains(cluster1));
+ assertTrue(clustersApplied1a.contains(cluster2));
+ Set<String> clustersApplied2a =
admin2.topicPolicies().getReplicationClusters(topicName, true);
+ assertFalse(clustersApplied2a.contains(cluster1));
+ assertTrue(clustersApplied2a.contains(cluster2));
+
+ Set<String> local1 =
admin1.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(local1));
+ Set<String> local2 =
admin2.topicPolicies(false).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isNotEmpty(local2));
+ assertTrue(local2.contains(cluster2));
+
+ Set<String> global1 =
admin1.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(global1));
+ Set<String> global2 =
admin2.topicPolicies(true).getReplicationClusters(topicName, false);
+ assertTrue(CollectionUtils.isEmpty(global2));
+ });
+ waitReplicatorStarted(subTopic, pulsar2);
+
+ admin1.topics().unload(subTopic);
+ admin2.topics().unload(subTopic);
+ }
+
@Test(enabled = false)
public void testCreateRemoteConsumerFirst() throws Exception {
super.testReplicatorProducerStatInTopic();
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
index 4238842bcfa..7a5623f849f 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.admin;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -1929,4 +1930,13 @@ public interface TopicPolicies {
* Get the dispatcherPauseOnAckStatePersistentEnabled policy for a given
topic asynchronously.
*/
CompletableFuture<Boolean> getDispatcherPauseOnAckStatePersistent(String
topic, boolean applied);
+
+ /**
+ * Set the replication clusters for the topic.
+ */
+ CompletableFuture<Void> setReplicationClusters(String topic, List<String>
clusterIds);
+
+ Set<String> getReplicationClusters(String topic, boolean applied) throws
PulsarAdminException;
+
+ void removeReplicationClusters(String topic) throws PulsarAdminException;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index f58fd865428..0a4a816640f 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.admin.internal;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -1281,6 +1282,36 @@ public class TopicPoliciesImpl extends BaseResource
implements TopicPolicies {
return asyncGetRequest(path, new FutureCallback<Boolean>(){});
}
+ @Override
+ public CompletableFuture<Void> setReplicationClusters(String topic,
List<String> clusterIds) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "replication");
+ return asyncPostRequest(path, Entity.entity(clusterIds,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public Set<String> getReplicationClusters(String topic, boolean applied)
throws PulsarAdminException {
+ return sync(() -> getReplicationClustersAsync(topic, applied));
+ }
+
+ public CompletableFuture<Set<String>> getReplicationClustersAsync(String
topic, boolean applied) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "replication");
+ path = path.queryParam("applied", applied);
+ return asyncGetRequest(path, new FutureCallback<Set<String>>(){});
+ }
+
+ @Override
+ public void removeReplicationClusters(String topic) throws
PulsarAdminException {
+ sync(() -> removeReplicationClustersAsync(topic));
+ }
+
+ public CompletableFuture<Void> removeReplicationClustersAsync(String
topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "replication");
+ return asyncDeleteRequest(path);
+ }
+
/*
* returns topic name with encoded Local Name
*/
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 7b7883af9ac..b49c4d40a53 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.admin.cli;
import static org.apache.pulsar.admin.cli.utils.CmdUtils.maxValueCheck;
import static org.apache.pulsar.admin.cli.utils.CmdUtils.positiveCheck;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -168,6 +169,10 @@ public class CmdTopicPolicies extends CmdBase {
new GetDispatcherPauseOnAckStatePersistent());
addCommand("remove-dispatcher-pause-on-ack-state-persistent",
new RemoveDispatcherPauseOnAckStatePersistent());
+
+ addCommand("get-replication-clusters", new GetReplicationClusters());
+ addCommand("set-replication-clusters", new SetReplicationClusters());
+ addCommand("remove-replication-clusters", new
RemoveReplicationClusters());
}
@Command(description = "Get entry filters for a topic")
@@ -1993,6 +1998,66 @@ public class CmdTopicPolicies extends CmdBase {
}
}
+ @Command(description = "Set the replication clusters for a topic, global
policy will be copied to the remote"
+ + " cluster if you enabled namespace level replication.")
+ private class SetReplicationClusters extends CliCommand {
+ @Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
+ private String topicName;
+
+ @Option(names = { "--clusters",
+ "-c" }, description = "Replication Cluster Ids list (comma
separated values)", required = true)
+ private String clusterIds;
+
+ @Option(names = { "--global", "-g" }, description = "Whether to set
this policy globally. "
+ + "If set to true, the policy will be replicate to other
clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(topicName);
+ List<String> clusters = Lists.newArrayList(clusterIds.split(","));
+ getTopicPolicies(isGlobal).setReplicationClusters(persistentTopic,
clusters);
+ }
+ }
+
+ @Command(description = "Get the replication clusters for a topic")
+ private class GetReplicationClusters extends CliCommand {
+ @Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
+ private String topicName;
+
+ @Option(names = { "-ap", "--applied" }, description = "Get the applied
policy of the topic. If set to true,"
+ + " the param \"--global\" will be ignored. ")
+ private boolean applied = false;
+
+ @Option(names = { "--global", "-g" }, description = "Whether to get
this policy globally. "
+ + "If set \"--applied\" to true, the current param will be
ignored. ")
+ private boolean isGlobal = false;
+
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(topicName);
+
print(getTopicPolicies(isGlobal).getReplicationClusters(persistentTopic,
applied));
+ }
+ }
+
+ @Command(description = "Remove the replication clusters for a topic, it
will not remove the policy from the remote"
+ + "cluster")
+ private class RemoveReplicationClusters extends CliCommand {
+ @Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
+ private String topicName;
+
+ @Option(names = { "--global", "-g" }, description = "Whether to get
this policy globally. "
+ + "If set to true, the policy will be replicate to other
clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(topicName);
+
getTopicPolicies(isGlobal).removeReplicationClusters(persistentTopic);
+ }
+ }
+
private TopicPolicies getTopicPolicies(boolean isGlobal) {
return getAdmin().topicPolicies(isGlobal);
}