This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 94a9423  cli support for get last message id of a topic (#3251)
94a9423 is described below

commit 94a942384b4d1e07fba4dfb1fe2cc0c06bc0a2ea
Author: legendtkl <taok...@gmail.com>
AuthorDate: Thu Dec 27 00:14:50 2018 +0800

    cli support for get last message id of a topic (#3251)
    
    This pr is add cli support to get last message id of a topic
    1. add cli support for api: 
/admin/v2/persistent/{tenant}/{namespace}/{topic}/lastMessageId
    2. the cli command is like: ./pulsar-admin topics last-message-id topic-name
    
    Test Result:
    
    ```bash
    ➜  bin ./pulsar-admin topics last-message-id 
persistent://public/default/my-topic
    {
      "ledgerId" : 10,
      "entryId" : -1,
      "partitionIndex" : -1
    }
    ➜  bin ./pulsar-admin topics last-message-id 
persistent://public/default/my-topi
    Topic not found
    
    Reason: Topic not found
    ```
---
 .../org/apache/pulsar/client/admin/Topics.java     |  9 ++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 32 ++++++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 13 +++++++++
 3 files changed, 54 insertions(+)

diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index f3f95ac..9c442c7 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1017,4 +1017,13 @@ public interface Topics {
      * @return the status of the offload operation
      */
     OffloadProcessStatus offloadStatus(String topic) throws 
PulsarAdminException;
+
+    /**
+     * Get the last commit message Id of a topic
+     *
+     * @param topic the topic name
+     * @return
+     * @throws PulsarAdminException
+     */
+    MessageId getLastMessageId(String topic) throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 216275d..5dc84fc 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -950,5 +950,37 @@ public class TopicsImpl extends BaseResource implements 
Topics, PersistentTopics
         return ret;
     }
 
+    @Override
+    public MessageId getLastMessageId(String topic) throws 
PulsarAdminException {
+        try {
+            return (MessageIdImpl) getLastMessageIdAsync(topic).get();
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e.getCause());
+        }
+    }
+
+    public CompletableFuture<MessageId> getLastMessageIdAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "lastMessageId");
+        final CompletableFuture<MessageId> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<MessageIdImpl>() {
+
+                    @Override
+                    public void completed(MessageIdImpl response) {
+                        future.complete(response);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        
future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(TopicsImpl.class);
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 5014f14..ceaebf4 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -93,6 +93,7 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("compaction-status", new CompactionStatusCmd());
         jcommander.addCommand("offload", new Offload());
         jcommander.addCommand("offload-status", new OffloadStatusCmd());
+        jcommander.addCommand("last-message-id", new GetLastMessageId());
     }
 
     @Parameters(commandDescription = "Get the list of topics under a 
namespace.")
@@ -708,4 +709,16 @@ public class CmdTopics extends CmdBase {
             }
         }
     }
+
+    @Parameters(commandDescription = "get the last commit message id of topic")
+    private class GetLastMessageId extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(topics.getLastMessageId(persistentTopic));
+        }
+    }
 }

Reply via email to