This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a42a29403fc154155be7d9036e7a8529743455bb Author: Rajan Dhabalia <[email protected]> AuthorDate: Mon Feb 24 22:52:08 2025 -0800 [improve][cli] Support additional msg metadata for V1 topic on peek message cmd (#23978) (cherry picked from commit 626b211f91fd8d1e9821ae8e2b9b29520e72ac63) --- .../pulsar/admin/cli/CmdPersistentTopics.java | 24 +----- .../org/apache/pulsar/admin/cli/CmdTopics.java | 94 ++++++++++++---------- 2 files changed, 52 insertions(+), 66 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 3dc0ba7b6f2..7b86e2af7f5 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -19,6 +19,7 @@ package org.apache.pulsar.admin.cli; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.admin.cli.CmdTopics.printMessages; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import io.netty.buffer.ByteBuf; @@ -37,8 +38,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.impl.BatchMessageIdImpl; -import org.apache.pulsar.client.impl.MessageIdImpl; import picocli.CommandLine.Command; import picocli.CommandLine.Option; import picocli.CommandLine.Parameters; @@ -589,26 +588,7 @@ public class CmdPersistentTopics extends CmdBase { void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(topicName); List<Message<byte[]>> messages = getPersistentTopics().peekMessages(persistentTopic, subName, numMessages); - int position = 0; - for (Message<byte[]> msg : messages) { - if (++position != 1) { - System.out.println("-------------------------------------------------------------------------\n"); - } - if (msg.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId(); - System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" - + msgId.getBatchIndex()); - } else { - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); - System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); - } - if (msg.getProperties().size() > 0) { - System.out.println("Properties:"); - print(msg.getProperties()); - } - ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); - System.out.println(ByteBufUtil.prettyHexDump(data)); - } + printMessages(messages, false, this); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 6da45725233..f19723ec6f1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -1117,50 +1117,7 @@ public class CmdTopics extends CmdBase { String persistentTopic = validatePersistentTopic(topicName); List<Message<byte[]>> messages = getTopics().peekMessages(persistentTopic, subName, numMessages, showServerMarker, transactionIsolationLevel); - int position = 0; - for (Message<byte[]> msg : messages) { - MessageImpl message = (MessageImpl) msg; - if (++position != 1) { - System.out.println("-------------------------------------------------------------------------\n"); - } - if (message.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); - System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" - + msgId.getBatchIndex()); - } else { - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); - System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); - } - - System.out.println("Publish time: " + message.getPublishTime()); - System.out.println("Event time: " + message.getEventTime()); - - if (message.getDeliverAtTime() != 0) { - System.out.println("Deliver at time: " + message.getDeliverAtTime()); - } - MessageMetadata msgMetaData = message.getMessageBuilder(); - if (showServerMarker && msgMetaData.hasMarkerType()) { - System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType())); - } - - if (message.getBrokerEntryMetadata() != null) { - if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { - System.out.println("Broker entry metadata timestamp: " - + message.getBrokerEntryMetadata().getBrokerTimestamp()); - } - if (message.getBrokerEntryMetadata().hasIndex()) { - System.out.println("Broker entry metadata index: " - + message.getBrokerEntryMetadata().getIndex()); - } - } - - if (message.getProperties().size() > 0) { - System.out.println("Properties:"); - print(msg.getProperties()); - } - ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); - System.out.println(ByteBufUtil.prettyHexDump(data)); - } + printMessages(messages, showServerMarker, this); } } @@ -1379,6 +1336,55 @@ public class CmdTopics extends CmdBase { return null; } + public static void printMessages(List<Message<byte[]>> messages, boolean showServerMarker, CliCommand cli) { + if (messages == null) { + return; + } + int position = 0; + for (Message<byte[]> msg : messages) { + MessageImpl message = (MessageImpl) msg; + if (++position != 1) { + System.out.println("-------------------------------------------------------------------------\n"); + } + if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + + msgId.getBatchIndex()); + } else { + MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); + } + + System.out.println("Publish time: " + message.getPublishTime()); + System.out.println("Event time: " + message.getEventTime()); + + if (message.getDeliverAtTime() != 0) { + System.out.println("Deliver at time: " + message.getDeliverAtTime()); + } + MessageMetadata msgMetaData = message.getMessageBuilder(); + if (showServerMarker && msgMetaData.hasMarkerType()) { + System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType())); + } + + if (message.getBrokerEntryMetadata() != null) { + if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { + System.out.println("Broker entry metadata timestamp: " + + message.getBrokerEntryMetadata().getBrokerTimestamp()); + } + if (message.getBrokerEntryMetadata().hasIndex()) { + System.out.println("Broker entry metadata index: " + message.getBrokerEntryMetadata().getIndex()); + } + } + + if (message.getProperties().size() > 0) { + System.out.println("Properties:"); + cli.print(msg.getProperties()); + } + ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); + System.out.println(ByteBufUtil.prettyHexDump(data)); + } + } + @Command(description = "Trigger offload of data from a topic to long-term storage (e.g. Amazon S3)") private class Offload extends CliCommand { @Option(names = { "-s", "--size-threshold" },
