This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 94a9423 cli support for get last message id of a topic (#3251) 94a9423 is described below commit 94a942384b4d1e07fba4dfb1fe2cc0c06bc0a2ea Author: legendtkl <taok...@gmail.com> AuthorDate: Thu Dec 27 00:14:50 2018 +0800 cli support for get last message id of a topic (#3251) This pr is add cli support to get last message id of a topic 1. add cli support for api: /admin/v2/persistent/{tenant}/{namespace}/{topic}/lastMessageId 2. the cli command is like: ./pulsar-admin topics last-message-id topic-name Test Result: ```bash ➜ bin ./pulsar-admin topics last-message-id persistent://public/default/my-topic { "ledgerId" : 10, "entryId" : -1, "partitionIndex" : -1 } ➜ bin ./pulsar-admin topics last-message-id persistent://public/default/my-topi Topic not found Reason: Topic not found ``` --- .../org/apache/pulsar/client/admin/Topics.java | 9 ++++++ .../pulsar/client/admin/internal/TopicsImpl.java | 32 ++++++++++++++++++++++ .../org/apache/pulsar/admin/cli/CmdTopics.java | 13 +++++++++ 3 files changed, 54 insertions(+) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index f3f95ac..9c442c7 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1017,4 +1017,13 @@ public interface Topics { * @return the status of the offload operation */ OffloadProcessStatus offloadStatus(String topic) throws PulsarAdminException; + + /** + * Get the last commit message Id of a topic + * + * @param topic the topic name + * @return + * @throws PulsarAdminException + */ + MessageId getLastMessageId(String topic) throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 216275d..5dc84fc 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -950,5 +950,37 @@ public class TopicsImpl extends BaseResource implements Topics, PersistentTopics return ret; } + @Override + public MessageId getLastMessageId(String topic) throws PulsarAdminException { + try { + return (MessageIdImpl) getLastMessageIdAsync(topic).get(); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e.getCause()); + } + } + + public CompletableFuture<MessageId> getLastMessageIdAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "lastMessageId"); + final CompletableFuture<MessageId> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback<MessageIdImpl>() { + + @Override + public void completed(MessageIdImpl response) { + future.complete(response); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 5014f14..ceaebf4 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -93,6 +93,7 @@ public class CmdTopics extends CmdBase { jcommander.addCommand("compaction-status", new CompactionStatusCmd()); jcommander.addCommand("offload", new Offload()); jcommander.addCommand("offload-status", new OffloadStatusCmd()); + jcommander.addCommand("last-message-id", new GetLastMessageId()); } @Parameters(commandDescription = "Get the list of topics under a namespace.") @@ -708,4 +709,16 @@ public class CmdTopics extends CmdBase { } } } + + @Parameters(commandDescription = "get the last commit message id of topic") + private class GetLastMessageId extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(topics.getLastMessageId(persistentTopic)); + } + } }