This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit faa4c75b461d41d03cec77eea80d88e54c23eb17 Author: Jiwei Guo <[email protected]> AuthorDate: Tue Jul 22 18:05:35 2025 +0800 [improve][client] Add `startTimestamp` and `endTimestamp` for consuming message in client cli (#24521) (cherry picked from commit e627c2c2d88b78fea1070750326a8c098bca3207) --- .../org/apache/pulsar/client/cli/CmdConsume.java | 25 +++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java index 98ca9bc8149..7f9288ee8e8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java @@ -118,6 +118,12 @@ public class CmdConsume extends AbstractCmdConsume { @Option(names = { "-mp", "--print-metadata" }, description = "Message metadata") private boolean printMetadata = false; + @Option(names = { "-stp", "--start-timestamp" }, description = "Start timestamp for consuming messages") + private long startTimestamp = 0L; + + @Option(names = { "-etp", "--end-timestamp" }, description = "End timestamp for consuming messages") + private long endTimestamp = Long.MAX_VALUE; + public CmdConsume() { // Do nothing super(); @@ -139,6 +145,18 @@ public class CmdConsume extends AbstractCmdConsume { throw new CommandLine.ParameterException(commandSpec.commandLine(), "Number of messages should be zero or positive."); } + if (this.startTimestamp < 0) { + throw new CommandLine.ParameterException(commandSpec.commandLine(), + "start timestamp should be positive."); + } + if (this.endTimestamp < 0) { + throw new CommandLine.ParameterException(commandSpec.commandLine(), + "end timestamp should be positive."); + } + if (this.endTimestamp < startTimestamp) { + throw new CommandLine.ParameterException(commandSpec.commandLine(), + "end timestamp should larger than start timestamp."); + } if (this.serviceURL.startsWith("ws")) { return consumeFromWebSocket(topic); @@ -188,17 +206,22 @@ public class CmdConsume extends AbstractCmdConsume { } try (Consumer<?> consumer = builder.subscribe();) { + if (startTimestamp > 0L) { + consumer.seek(startTimestamp); + } RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null; while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) { if (limiter != null) { limiter.acquire(); } - Message<?> msg = consumer.receive(5, TimeUnit.SECONDS); if (msg == null) { LOG.debug("No message to consume after waiting for 5 seconds."); } else { try { + if (msg.getPublishTime() > endTimestamp) { + break; + } numMessagesConsumed += 1; if (!hideContent) { System.out.println(MESSAGE_BOUNDARY);
