This is an automated email from the ASF dual-hosted git repository. linlin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9f599c9 Persistence topic policies support cross multiple clusters (#13483) 9f599c9 is described below commit 9f599c9572e5d9b1f15efa6e895e7eb29b284e57 Author: feynmanlin <feynman...@tencent.com> AuthorDate: Fri Dec 24 15:44:16 2021 +0800 Persistence topic policies support cross multiple clusters (#13483) * Persistence policies support cross multiple clusters --- .../broker/admin/impl/PersistentTopicsBase.java | 15 +++-- .../pulsar/broker/admin/v2/PersistentTopics.java | 9 ++- .../service/ReplicatorTopicPoliciesTest.java | 22 +++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 16 +++++ .../apache/pulsar/admin/cli/CmdTopicPolicies.java | 71 ++++++++++++++++++++++ .../org/apache/pulsar/admin/cli/CmdTopics.java | 4 ++ 6 files changed, 128 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 21b8540..f383fc7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2999,8 +2999,8 @@ public class PersistentTopicsBase extends AdminResource { }); } - protected CompletableFuture<PersistencePolicies> internalGetPersistence(boolean applied) { - return getTopicPoliciesAsyncWithRetry(topicName) + protected CompletableFuture<PersistencePolicies> internalGetPersistence(boolean applied, boolean isGlobal) { + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) .thenApply(op -> op.map(TopicPolicies::getPersistence) .orElseGet(() -> { if (applied) { @@ -3018,23 +3018,26 @@ public class PersistentTopicsBase extends AdminResource { })); } - protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies persistencePolicies) { + protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies persistencePolicies, + boolean isGlobal) { validatePersistencePolicies(persistencePolicies); - return getTopicPoliciesAsyncWithRetry(topicName) + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) .thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); topicPolicies.setPersistence(persistencePolicies); + topicPolicies.setIsGlobal(isGlobal); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); }); } - protected CompletableFuture<Void> internalRemovePersistence() { - return getTopicPoliciesAsyncWithRetry(topicName) + protected CompletableFuture<Void> internalRemovePersistence(boolean isGlobal) { + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) .thenCompose(op -> { if (!op.isPresent()) { return CompletableFuture.completedFuture(null); } op.get().setPersistence(null); + op.get().setIsGlobal(isGlobal); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 39b1c03..25fbf83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -2039,11 +2039,12 @@ public class PersistentTopics extends PersistentTopicsBase { @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("applied") boolean applied, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenCompose(__ -> internalGetPersistence(applied)) + .thenCompose(__ -> internalGetPersistence(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { handleTopicPolicyException("getPersistence", ex, asyncResponse); @@ -2066,11 +2067,12 @@ public class PersistentTopics extends PersistentTopicsBase { @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Bookkeeper persistence policies for specified topic") PersistencePolicies persistencePolicies) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenCompose(__ -> internalSetPersistence(persistencePolicies)) + .thenCompose(__ -> internalSetPersistence(persistencePolicies, isGlobal)) .thenRun(() -> { try { log.info("[{}] Successfully updated persistence policies: " @@ -2101,11 +2103,12 @@ public class PersistentTopics extends PersistentTopicsBase { @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenCompose(__ -> internalRemovePersistence()) + .thenCompose(__ -> internalRemovePersistence(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove persistence policies: namespace={}, topic={}", clientAppId(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index c8c5553..3c0e3fb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -28,6 +28,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; import org.awaitility.Awaitility; @@ -86,6 +87,27 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { } @Test + public void testReplicatePersistentPolicies() throws Exception { + final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); + final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); + init(namespace, topic); + // set PersistencePolicies + PersistencePolicies policies = new PersistencePolicies(5, 3, 2, 1000); + admin1.topicPolicies(true).setPersistence(topic, policies); + + Awaitility.await().untilAsserted(() -> + assertEquals(admin2.topicPolicies(true).getPersistence(topic), policies)); + Awaitility.await().untilAsserted(() -> + assertEquals(admin3.topicPolicies(true).getPersistence(topic), policies)); + //remove PersistencePolicies + admin1.topicPolicies(true).removePersistence(topic); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies(true).getPersistence(topic))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies(true).getPersistence(topic))); + } + + @Test public void testReplicatorTopicPolicies() throws Exception { final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID(); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 90ae899..b4adf64 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -900,6 +900,14 @@ public class PulsarAdminToolTest { cmdTopics.run(split("remove-retention persistent://myprop/clust/ns1/ds1")); verify(mockTopicsPolicies).removeRetention("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1")); + verify(mockTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 -e 2 -w 1 -a 1 -r 100.0")); + verify(mockTopicsPolicies).setPersistence("persistent://myprop/clust/ns1/ds1", + new PersistencePolicies(2, 1, 1, 100.0d)); + cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1")); + verify(mockTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1"); + // Reset the cmd, and check global option cmdTopics = new CmdTopicPolicies(() -> admin); cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1 -g")); @@ -937,6 +945,14 @@ public class PulsarAdminToolTest { cmdTopics = new CmdTopicPolicies(() -> admin); cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1 -t message_age")); verify(mockTopicsPolicies).removeBacklogQuota("persistent://myprop/clust/ns1/ds1", BacklogQuota.BacklogQuotaType.message_age); + + cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1 -g")); + verify(mockGlobalTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 -e 2 -w 1 -a 1 -r 100.0 -g")); + verify(mockGlobalTopicsPolicies).setPersistence("persistent://myprop/clust/ns1/ds1", + new PersistencePolicies(2, 1, 1, 100.0d)); + cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1 -g")); + verify(mockGlobalTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1"); } @Test diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index ceea877..e54a4e6 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -29,6 +29,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.TopicPolicies; import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.util.RelativeTimeUtil; @@ -44,6 +45,10 @@ public class CmdTopicPolicies extends CmdBase { jcommander.addCommand("get-backlog-quota", new GetBacklogQuotaMap()); jcommander.addCommand("set-backlog-quota", new SetBacklogQuota()); jcommander.addCommand("remove-backlog-quota", new RemoveBacklogQuota()); + + jcommander.addCommand("get-persistence", new GetPersistence()); + jcommander.addCommand("set-persistence", new SetPersistence()); + jcommander.addCommand("remove-persistence", new RemovePersistence()); } @Parameters(commandDescription = "Get the retention policy for a topic") @@ -221,6 +226,72 @@ public class CmdTopicPolicies extends CmdBase { } } + @Parameters(commandDescription = "Get the persistence policies for a topic") + private class GetPersistence extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies", arity = 0) + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(getTopicPolicies(isGlobal).getPersistence(persistentTopic)); + } + } + + @Parameters(commandDescription = "Set the persistence policies for a topic") + private class SetPersistence extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = { "-e", + "--bookkeeper-ensemble" }, description = "Number of bookies to use for a topic", required = true) + private int bookkeeperEnsemble; + + @Parameter(names = { "-w", + "--bookkeeper-write-quorum" }, description = "How many writes to make of each entry", required = true) + private int bookkeeperWriteQuorum; + + @Parameter(names = { "-a", + "--bookkeeper-ack-quorum" }, description = "Number of acks (guaranteed copies) to wait for each entry", required = true) + private int bookkeeperAckQuorum; + + @Parameter(names = { "-r", + "--ml-mark-delete-max-rate" }, description = "Throttling rate of mark-delete operation (0 means no throttle)", required = true) + private double managedLedgerMaxMarkDeleteRate; + + @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. " + + "If set to true, the policy will be replicate to other clusters asynchronously", arity = 0) + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getTopicPolicies(isGlobal).setPersistence(persistentTopic, new PersistencePolicies(bookkeeperEnsemble, + bookkeeperWriteQuorum, bookkeeperAckQuorum, managedLedgerMaxMarkDeleteRate)); + } + } + + @Parameters(commandDescription = "Remove the persistence policy for a topic") + private class RemovePersistence extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. " + + "If set to true, the removing operation will be replicate to other clusters asynchronously" + , arity = 0) + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getTopicPolicies(isGlobal).removePersistence(persistentTopic); + } + } + private TopicPolicies getTopicPolicies(boolean isGlobal) { return getAdmin().topicPolicies(isGlobal); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 9bd49ab..c228125 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -273,6 +273,10 @@ public class CmdTopics extends CmdBase { cmdUsageFormatter.addDeprecatedCommand("get-backlog-quotas"); cmdUsageFormatter.addDeprecatedCommand("set-backlog-quota"); cmdUsageFormatter.addDeprecatedCommand("remove-backlog-quota"); + + cmdUsageFormatter.addDeprecatedCommand("get-persistence"); + cmdUsageFormatter.addDeprecatedCommand("set-persistence"); + cmdUsageFormatter.addDeprecatedCommand("remove-persistence"); } }