This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit faa4c75b461d41d03cec77eea80d88e54c23eb17
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Jul 22 18:05:35 2025 +0800

    [improve][client] Add `startTimestamp` and `endTimestamp` for consuming 
message in client cli (#24521)
    
    (cherry picked from commit e627c2c2d88b78fea1070750326a8c098bca3207)
---
 .../org/apache/pulsar/client/cli/CmdConsume.java   | 25 +++++++++++++++++++++-
 1 file changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 98ca9bc8149..7f9288ee8e8 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -118,6 +118,12 @@ public class CmdConsume extends AbstractCmdConsume {
     @Option(names = { "-mp", "--print-metadata" }, description = "Message 
metadata")
     private boolean printMetadata = false;
 
+    @Option(names = { "-stp", "--start-timestamp" }, description = "Start 
timestamp for consuming messages")
+    private long startTimestamp = 0L;
+
+    @Option(names = { "-etp", "--end-timestamp" }, description = "End 
timestamp for consuming messages")
+    private long endTimestamp = Long.MAX_VALUE;
+
     public CmdConsume() {
         // Do nothing
         super();
@@ -139,6 +145,18 @@ public class CmdConsume extends AbstractCmdConsume {
             throw new CommandLine.ParameterException(commandSpec.commandLine(),
                     "Number of messages should be zero or positive.");
         }
+        if (this.startTimestamp < 0) {
+            throw new CommandLine.ParameterException(commandSpec.commandLine(),
+                    "start timestamp should be positive.");
+        }
+        if (this.endTimestamp < 0) {
+            throw new CommandLine.ParameterException(commandSpec.commandLine(),
+                    "end timestamp should be positive.");
+        }
+        if (this.endTimestamp < startTimestamp) {
+            throw new CommandLine.ParameterException(commandSpec.commandLine(),
+                    "end timestamp should larger than start timestamp.");
+        }
 
         if (this.serviceURL.startsWith("ws")) {
             return consumeFromWebSocket(topic);
@@ -188,17 +206,22 @@ public class CmdConsume extends AbstractCmdConsume {
             }
 
             try (Consumer<?> consumer = builder.subscribe();) {
+                if (startTimestamp > 0L) {
+                    consumer.seek(startTimestamp);
+                }
                 RateLimiter limiter = (this.consumeRate > 0) ? 
RateLimiter.create(this.consumeRate) : null;
                 while (this.numMessagesToConsume == 0 || numMessagesConsumed < 
this.numMessagesToConsume) {
                     if (limiter != null) {
                         limiter.acquire();
                     }
-
                     Message<?> msg = consumer.receive(5, TimeUnit.SECONDS);
                     if (msg == null) {
                         LOG.debug("No message to consume after waiting for 5 
seconds.");
                     } else {
                         try {
+                            if (msg.getPublishTime() > endTimestamp) {
+                                break;
+                            }
                             numMessagesConsumed += 1;
                             if (!hideContent) {
                                 System.out.println(MESSAGE_BOUNDARY);

Reply via email to