This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8377396 Fix PR #7818 #7802 does not support admin cli (#7940) 8377396 is described below commit 83773962dda57f43ce46aa3ac625eb0860d0599d Author: feynmanlin <feynman...@tencent.com> AuthorDate: Thu Sep 3 16:37:36 2020 +0800 Fix PR #7818 #7802 does not support admin cli (#7940) ### Motivation PR #7818 #7802 supports topic-level policies. But the pulsar admin cli java doc is not supported accordingly. --- .../pulsar/admin/cli/PulsarAdminToolTest.java | 14 ++++ .../org/apache/pulsar/admin/cli/CmdTopics.java | 84 ++++++++++++++++++++++ 2 files changed, 98 insertions(+) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index a46ee7c..c684329 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -726,6 +726,20 @@ public class PulsarAdminToolTest { cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3")); verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3); + cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("remove-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1 -m 999")); + verify(mockTopics).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999); + + cmdTopics.run(split("get-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("remove-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1 -m 99")); + verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99); + // argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a // range of +/- 1 second of the expected timestamp class TimestampMatcher implements ArgumentMatcher<Long> { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index df0d342..6d18fea 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -127,6 +127,12 @@ public class CmdTopics extends CmdBase { jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold()); jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold()); jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold()); + jcommander.addCommand("get-max-unacked-messages-on-consumer", new GetMaxUnackedMessagesOnConsumer()); + jcommander.addCommand("set-max-unacked-messages-on-consumer", new SetMaxUnackedMessagesOnConsumer()); + jcommander.addCommand("remove-max-unacked-messages-on-consumer", new RemoveMaxUnackedMessagesOnConsumer()); + jcommander.addCommand("get-max-unacked-messages-on-subscription", new GetMaxUnackedMessagesOnSubscription()); + jcommander.addCommand("set-max-unacked-messages-on-subscription", new SetMaxUnackedMessagesOnSubscription()); + jcommander.addCommand("remove-max-unacked-messages-on-subscription", new RemoveMaxUnackedMessagesOnSubscription()); jcommander.addCommand("get-publish-rate", new GetPublishRate()); jcommander.addCommand("set-publish-rate", new SetPublishRate()); jcommander.addCommand("remove-publish-rate", new RemovePublishRate()); @@ -1170,6 +1176,84 @@ public class CmdTopics extends CmdBase { } } + @Parameters(commandDescription = "Get max unacked messages policy on consumer for a topic") + private class GetMaxUnackedMessagesOnConsumer extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(admin.topics().getMaxUnackedMessagesOnConsumer(persistentTopic)); + } + } + + @Parameters(commandDescription = "Remove max unacked messages policy on consumer for a topic") + private class RemoveMaxUnackedMessagesOnConsumer extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().removeMaxUnackedMessagesOnConsumer(persistentTopic); + } + } + + @Parameters(commandDescription = "Set max unacked messages policy on consumer for a topic") + private class SetMaxUnackedMessagesOnConsumer extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on consumer", required = true) + private int maxNum; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().setMaxUnackedMessagesOnConsumer(persistentTopic, maxNum); + } + } + + @Parameters(commandDescription = "Get max unacked messages policy on subscription for a topic") + private class GetMaxUnackedMessagesOnSubscription extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(admin.topics().getMaxUnackedMessagesOnSubscription(persistentTopic)); + } + } + + @Parameters(commandDescription = "Remove max unacked messages policy on subscription for a topic") + private class RemoveMaxUnackedMessagesOnSubscription extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().removeMaxUnackedMessagesOnSubscription(persistentTopic); + } + } + + @Parameters(commandDescription = "Set max unacked messages policy on subscription for a topic") + private class SetMaxUnackedMessagesOnSubscription extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on subscription", required = true) + private int maxNum; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().setMaxUnackedMessagesOnSubscription(persistentTopic, maxNum); + } + } + @Parameters(commandDescription = "Get compaction threshold for a topic") private class GetCompactionThreshold extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true)