This is an automated email from the ASF dual-hosted git repository.

linlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f599c9  Persistence topic policies support cross multiple clusters 
(#13483)
9f599c9 is described below

commit 9f599c9572e5d9b1f15efa6e895e7eb29b284e57
Author: feynmanlin <feynman...@tencent.com>
AuthorDate: Fri Dec 24 15:44:16 2021 +0800

    Persistence topic policies support cross multiple clusters (#13483)
    
    * Persistence policies support cross multiple clusters
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 15 +++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  9 ++-
 .../service/ReplicatorTopicPoliciesTest.java       | 22 +++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 16 +++++
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  | 71 ++++++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  4 ++
 6 files changed, 128 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 21b8540..f383fc7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2999,8 +2999,8 @@ public class PersistentTopicsBase extends AdminResource {
                 });
     }
 
-    protected CompletableFuture<PersistencePolicies> 
internalGetPersistence(boolean applied) {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<PersistencePolicies> 
internalGetPersistence(boolean applied, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenApply(op -> op.map(TopicPolicies::getPersistence)
                 .orElseGet(() -> {
                     if (applied) {
@@ -3018,23 +3018,26 @@ public class PersistentTopicsBase extends AdminResource 
{
                 }));
     }
 
-    protected CompletableFuture<Void> 
internalSetPersistence(PersistencePolicies persistencePolicies) {
+    protected CompletableFuture<Void> 
internalSetPersistence(PersistencePolicies persistencePolicies,
+                                                             boolean isGlobal) 
{
         validatePersistencePolicies(persistencePolicies);
-        return getTopicPoliciesAsyncWithRetry(topicName)
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
                 topicPolicies.setPersistence(persistencePolicies);
+                topicPolicies.setIsGlobal(isGlobal);
                 return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
             });
     }
 
-    protected CompletableFuture<Void> internalRemovePersistence() {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Void> internalRemovePersistence(boolean 
isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 if (!op.isPresent()) {
                     return CompletableFuture.completedFuture(null);
                 }
                 op.get().setPersistence(null);
+                op.get().setIsGlobal(isGlobal);
                 return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
             });
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 39b1c03..25fbf83 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2039,11 +2039,12 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") boolean applied,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this 
operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalGetPersistence(applied))
+            .thenCompose(__ -> internalGetPersistence(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
                 handleTopicPolicyException("getPersistence", ex, 
asyncResponse);
@@ -2066,11 +2067,12 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this 
operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Bookkeeper persistence policies for specified 
topic")
             PersistencePolicies persistencePolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetPersistence(persistencePolicies))
+            .thenCompose(__ -> internalSetPersistence(persistencePolicies, 
isGlobal))
             .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully updated persistence policies: "
@@ -2101,11 +2103,12 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this 
operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalRemovePersistence())
+            .thenCompose(__ -> internalRemovePersistence(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove persistence policies: 
namespace={}, topic={}",
                         clientAppId(),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index c8c5553..3c0e3fb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.awaitility.Awaitility;
@@ -86,6 +87,27 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
     }
 
     @Test
+    public void testReplicatePersistentPolicies() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String topic = "persistent://" + namespace + "/topic" + 
UUID.randomUUID();
+        init(namespace, topic);
+        // set PersistencePolicies
+        PersistencePolicies policies = new PersistencePolicies(5, 3, 2, 1000);
+        admin1.topicPolicies(true).setPersistence(topic, policies);
+
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin2.topicPolicies(true).getPersistence(topic), 
policies));
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin3.topicPolicies(true).getPersistence(topic), 
policies));
+        //remove PersistencePolicies
+        admin1.topicPolicies(true).removePersistence(topic);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies(true).getPersistence(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies(true).getPersistence(topic)));
+    }
+
+    @Test
     public void testReplicatorTopicPolicies() throws Exception {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String persistentTopicName = "persistent://" + namespace + 
"/topic" + UUID.randomUUID();
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 90ae899..b4adf64 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -900,6 +900,14 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("remove-retention 
persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopicsPolicies).removeRetention("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-persistence 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 
-e 2 -w 1 -a 1 -r 100.0"));
+        
verify(mockTopicsPolicies).setPersistence("persistent://myprop/clust/ns1/ds1",
+                new PersistencePolicies(2, 1, 1, 100.0d));
+        cmdTopics.run(split("remove-persistence 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");
+
         // Reset the cmd, and check global option
         cmdTopics = new CmdTopicPolicies(() -> admin);
         cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1 
-g"));
@@ -937,6 +945,14 @@ public class PulsarAdminToolTest {
         cmdTopics = new CmdTopicPolicies(() -> admin);
         cmdTopics.run(split("remove-backlog-quota 
persistent://myprop/clust/ns1/ds1 -t message_age"));
         
verify(mockTopicsPolicies).removeBacklogQuota("persistent://myprop/clust/ns1/ds1",
 BacklogQuota.BacklogQuotaType.message_age);
+
+        cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1 
-g"));
+        
verify(mockGlobalTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 
-e 2 -w 1 -a 1 -r 100.0 -g"));
+        
verify(mockGlobalTopicsPolicies).setPersistence("persistent://myprop/clust/ns1/ds1",
+                new PersistencePolicies(2, 1, 1, 100.0d));
+        cmdTopics.run(split("remove-persistence 
persistent://myprop/clust/ns1/ds1 -g"));
+        
verify(mockGlobalTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");
     }
 
     @Test
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index ceea877..e54a4e6 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.TopicPolicies;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 
@@ -44,6 +45,10 @@ public class CmdTopicPolicies extends CmdBase {
         jcommander.addCommand("get-backlog-quota", new GetBacklogQuotaMap());
         jcommander.addCommand("set-backlog-quota", new SetBacklogQuota());
         jcommander.addCommand("remove-backlog-quota", new 
RemoveBacklogQuota());
+
+        jcommander.addCommand("get-persistence", new GetPersistence());
+        jcommander.addCommand("set-persistence", new SetPersistence());
+        jcommander.addCommand("remove-persistence", new RemovePersistence());
     }
 
     @Parameters(commandDescription = "Get the retention policy for a topic")
@@ -221,6 +226,72 @@ public class CmdTopicPolicies extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get the persistence policies for a 
topic")
+    private class GetPersistence extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get 
this policy globally. "
+                + "If set to true, broker returned global topic policies", 
arity = 0)
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopicPolicies(isGlobal).getPersistence(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the persistence policies for a 
topic")
+    private class SetPersistence extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-e",
+                "--bookkeeper-ensemble" }, description = "Number of bookies to 
use for a topic", required = true)
+        private int bookkeeperEnsemble;
+
+        @Parameter(names = { "-w",
+                "--bookkeeper-write-quorum" }, description = "How many writes 
to make of each entry", required = true)
+        private int bookkeeperWriteQuorum;
+
+        @Parameter(names = { "-a",
+                "--bookkeeper-ack-quorum" }, description = "Number of acks 
(guaranteed copies) to wait for each entry", required = true)
+        private int bookkeeperAckQuorum;
+
+        @Parameter(names = { "-r",
+                "--ml-mark-delete-max-rate" }, description = "Throttling rate 
of mark-delete operation (0 means no throttle)", required = true)
+        private double managedLedgerMaxMarkDeleteRate;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set 
this policy globally. "
+                + "If set to true, the policy will be replicate to other 
clusters asynchronously", arity = 0)
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).setPersistence(persistentTopic, new 
PersistencePolicies(bookkeeperEnsemble,
+                    bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove the persistence policy for a 
topic")
+    private class RemovePersistence extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to 
remove this policy globally. "
+                + "If set to true, the removing operation will be replicate to 
other clusters asynchronously"
+                , arity = 0)
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).removePersistence(persistentTopic);
+        }
+    }
+
     private TopicPolicies getTopicPolicies(boolean isGlobal) {
         return getAdmin().topicPolicies(isGlobal);
     }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9bd49ab..c228125 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -273,6 +273,10 @@ public class CmdTopics extends CmdBase {
             cmdUsageFormatter.addDeprecatedCommand("get-backlog-quotas");
             cmdUsageFormatter.addDeprecatedCommand("set-backlog-quota");
             cmdUsageFormatter.addDeprecatedCommand("remove-backlog-quota");
+
+            cmdUsageFormatter.addDeprecatedCommand("get-persistence");
+            cmdUsageFormatter.addDeprecatedCommand("set-persistence");
+            cmdUsageFormatter.addDeprecatedCommand("remove-persistence");
         }
     }
 

Reply via email to