This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new afe4792  [Issue 11440]. Add complete metadata for 
admin.topics().examineMessages (#11443)
afe4792 is described below

commit afe47926b904033c304256d96ade5e6214c51bbd
Author: Jason918 <jason....@qq.com>
AuthorDate: Thu Jul 29 11:41:07 2021 +0800

    [Issue 11440]. Add complete metadata for admin.topics().examineMessages 
(#11443)
    
    Fixes #11440
    
    ### Motivation
    
    see issue 11440
    
    ### Modifications
    
    Add all the other non-empty meta data fields in http response headers in 
`org.apache.pulsar.broker.admin.impl.PersistentTopicsBase#generateResponseWithEntry`.
 For fields with byte[] type, base64 is used for serialization and 
deserialization.
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  73 ++++++++++++
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  45 +++++++-
 .../pulsar/client/admin/internal/TopicsImpl.java   | 125 ++++++++++++++++++++-
 site2/docs/admin-api-topics.md                     |  24 ++++
 4 files changed, 265 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8aa92e0..e372ba8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -94,6 +95,7 @@ import 
org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.EncryptionKeys;
 import org.apache.pulsar.common.api.proto.KeyValue;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.compression.CompressionCodec;
@@ -2571,6 +2573,77 @@ public class PersistentTopicsBase extends AdminResource {
         }
         responseBuilder.header("X-Pulsar-Is-Encrypted", 
metadata.getEncryptionKeysCount() > 0);
 
+        if (metadata.hasProducerName()) {
+            responseBuilder.header("X-Pulsar-producer-name", 
metadata.getProducerName());
+        }
+        if (metadata.hasSequenceId()) {
+            responseBuilder.header("X-Pulsar-sequence-id", 
metadata.getSequenceId());
+        }
+        if (metadata.hasReplicatedFrom()) {
+            responseBuilder.header("X-Pulsar-replicated-from", 
metadata.getReplicatedFrom());
+        }
+        for (String replicatedTo : metadata.getReplicateTosList()) {
+            responseBuilder.header("X-Pulsar-replicated-to", replicatedTo);
+        }
+        if (metadata.hasPartitionKey()) {
+            responseBuilder.header("X-Pulsar-partition-key", 
metadata.getPartitionKey());
+        }
+        if (metadata.hasCompression()) {
+            responseBuilder.header("X-Pulsar-compression", 
metadata.getCompression());
+        }
+        if (metadata.hasUncompressedSize()) {
+            responseBuilder.header("X-Pulsar-uncompressed-size", 
metadata.getUncompressedSize());
+        }
+        if (metadata.hasEncryptionAlgo()) {
+            responseBuilder.header("X-Pulsar-encryption-algo", 
metadata.getEncryptionAlgo());
+        }
+        for (EncryptionKeys encryptionKeys : metadata.getEncryptionKeysList()) 
{
+            responseBuilder.header("X-Pulsar-Base64-encryption-keys",
+                    
Base64.getEncoder().encodeToString(encryptionKeys.toByteArray()));
+        }
+        if (metadata.hasEncryptionParam()) {
+            responseBuilder.header("X-Pulsar-Base64-encryption-param",
+                    
Base64.getEncoder().encodeToString(metadata.getEncryptionParam()));
+        }
+        if (metadata.hasSchemaVersion()) {
+            responseBuilder.header("X-Pulsar-Base64-schema-version",
+                    
Base64.getEncoder().encodeToString(metadata.getSchemaVersion()));
+        }
+        if (metadata.hasPartitionKeyB64Encoded()) {
+            responseBuilder.header("X-Pulsar-partition-key-b64-encoded", 
metadata.isPartitionKeyB64Encoded());
+        }
+        if (metadata.hasOrderingKey()) {
+            responseBuilder.header("X-Pulsar-Base64-ordering-key",
+                    
Base64.getEncoder().encodeToString(metadata.getOrderingKey()));
+        }
+        if (metadata.hasMarkerType()) {
+            responseBuilder.header("X-Pulsar-marker-type", 
metadata.getMarkerType());
+        }
+        if (metadata.hasTxnidLeastBits()) {
+            responseBuilder.header("X-Pulsar-txnid-least-bits", 
metadata.getTxnidLeastBits());
+        }
+        if (metadata.hasTxnidMostBits()) {
+            responseBuilder.header("X-Pulsar-txnid-most-bits", 
metadata.getTxnidMostBits());
+        }
+        if (metadata.hasHighestSequenceId()) {
+            responseBuilder.header("X-Pulsar-highest-sequence-id", 
metadata.getHighestSequenceId());
+        }
+        if (metadata.hasUuid()) {
+            responseBuilder.header("X-Pulsar-uuid", metadata.getUuid());
+        }
+        if (metadata.hasNumChunksFromMsg()) {
+            responseBuilder.header("X-Pulsar-num-chunks-from-msg", 
metadata.getNumChunksFromMsg());
+        }
+        if (metadata.hasTotalChunkMsgSize()) {
+            responseBuilder.header("X-Pulsar-total-chunk-msg-size", 
metadata.getTotalChunkMsgSize());
+        }
+        if (metadata.hasChunkId()) {
+            responseBuilder.header("X-Pulsar-chunk-id", metadata.getChunkId());
+        }
+        if (metadata.hasNullPartitionKey()) {
+            responseBuilder.header("X-Pulsar-null-partition-key", 
metadata.isNullPartitionKey());
+        }
+
         // Decode if needed
         CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
         ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, 
metadata.getUncompressedSize());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 9ad32a3..c089dfe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -46,7 +46,6 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
@@ -57,6 +56,7 @@ import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
+import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -64,6 +64,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ProducerBase;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -720,6 +721,48 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testExamineMessageMetadata() throws Exception {
+        TenantInfoImpl tenantInfo = new 
TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("tenant-xyz", tenantInfo);
+        admin.namespaces().createNamespace("tenant-xyz/ns-abc", 
Sets.newHashSet("test"));
+        final String topicName = 
"persistent://tenant-xyz/ns-abc/topic-testExamineMessageMetadata";
+
+        admin.topics().createPartitionedTopic(topicName, 2);
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .producerName("testExamineMessageMetadataProducer")
+                .compressionType(CompressionType.LZ4)
+                .topic(topicName + "-partition-0")
+                .create();
+
+        producer.newMessage()
+                .keyBytes("partition123".getBytes())
+                .orderingKey(new byte[]{0})
+                .replicationClusters(Lists.newArrayList("a", "b"))
+                .sequenceId(112233)
+                .value("data")
+                .send();
+
+        MessageImpl<byte[]> message = (MessageImpl<byte[]>) 
admin.topics().examineMessage(
+                topicName + "-partition-0", "earliest", 1);
+
+        //test long
+        Assert.assertEquals(112233, message.getSequenceId());
+        //test byte[]
+        Assert.assertEquals(new byte[]{0}, message.getOrderingKey());
+        //test bool and byte[]
+        Assert.assertEquals("partition123".getBytes(), message.getKeyBytes());
+        Assert.assertTrue(message.hasBase64EncodedKey());
+        //test arrays
+        Assert.assertEquals(Lists.newArrayList("a", "b"), 
message.getReplicateTo());
+        //test string
+        Assert.assertEquals(producer.getProducerName(), 
message.getProducerName());
+        //test enum
+        Assert.assertEquals(CompressionType.LZ4.ordinal(), 
message.getMessageBuilder().getCompression().ordinal());
+
+        Assert.assertEquals("data", new String(message.getData()));
+    }
+
+    @Test
     public void testOffloadWithNullMessageId() {
         final String topicName = "topic-123";
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, 
topicName, true);
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index f611358..b5af896 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -25,6 +25,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -59,6 +60,8 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.EncryptionKeys;
 import org.apache.pulsar.common.api.proto.KeyValue;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
@@ -100,7 +103,29 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     private static final String EVENT_TIME = "X-Pulsar-event-time";
     private static final String DELIVER_AT_TIME = "X-Pulsar-deliver-at-time";
     private static final String BROKER_ENTRY_TIMESTAMP = 
"X-Pulsar-Broker-Entry-METADATA-timestamp";
-    private static final String BROKER_ENTRY_INDEX =  
"X-Pulsar-Broker-Entry-METADATA-index";
+    private static final String BROKER_ENTRY_INDEX = 
"X-Pulsar-Broker-Entry-METADATA-index";
+    private static final String PRODUCER_NAME = "X-Pulsar-producer-name";
+    private static final String SEQUENCE_ID = "X-Pulsar-sequence-id";
+    private static final String REPLICATED_FROM = "X-Pulsar-replicated-from";
+    private static final String PARTITION_KEY = "X-Pulsar-partition-key";
+    private static final String COMPRESSION = "X-Pulsar-compression";
+    private static final String UNCOMPRESSED_SIZE = 
"X-Pulsar-uncompressed-size";
+    private static final String ENCRYPTION_ALGO = "X-Pulsar-encryption-algo";
+    private static final String MARKER_TYPE = "X-Pulsar-marker-type";
+    private static final String TXNID_LEAST_BITS = "X-Pulsar-txnid-least-bits";
+    private static final String TXNID_MOST_BITS = "X-Pulsar-txnid-most-bits";
+    private static final String HIGHEST_SEQUENCE_ID = 
"X-Pulsar-highest-sequence-id";
+    private static final String UUID = "X-Pulsar-uuid";
+    private static final String NUM_CHUNKS_FROM_MSG = 
"X-Pulsar-num-chunks-from-msg";
+    private static final String TOTAL_CHUNK_MSG_SIZE = 
"X-Pulsar-total-chunk-msg-size";
+    private static final String CHUNK_ID = "X-Pulsar-chunk-id";
+    private static final String PARTITION_KEY_B64_ENCODED = 
"X-Pulsar-partition-key-b64-encoded";
+    private static final String NULL_PARTITION_KEY = 
"X-Pulsar-null-partition-key";
+    private static final String REPLICATED_TO = "X-Pulsar-replicated-to";
+    private static final String ORDERING_KEY = "X-Pulsar-Base64-ordering-key";
+    private static final String SCHEMA_VERSION = 
"X-Pulsar-Base64-schema-version-b64encoded";
+    private static final String ENCRYPTION_PARAM = 
"X-Pulsar-Base64-encryption-param";
+    private static final String ENCRYPTION_KEYS = 
"X-Pulsar-Base64-encryption-keys";
     // CHECKSTYLE.ON: MemberName
 
     public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
@@ -1555,6 +1580,104 @@ public class TopicsImpl extends BaseResource implements 
Topics {
                 
messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
             }
 
+            tmp = headers.getFirst(PRODUCER_NAME);
+            if (tmp != null) {
+                messageMetadata.setProducerName(tmp.toString());
+            }
+            tmp = headers.getFirst(SEQUENCE_ID);
+            if (tmp != null) {
+                messageMetadata.setSequenceId(Long.parseLong(tmp.toString()));
+            }
+            tmp = headers.getFirst(REPLICATED_FROM);
+            if (tmp != null) {
+                messageMetadata.setReplicatedFrom(tmp.toString());
+            }
+            tmp = headers.getFirst(PARTITION_KEY);
+            if (tmp != null) {
+                messageMetadata.setPartitionKey(tmp.toString());
+            }
+            tmp = headers.getFirst(COMPRESSION);
+            if (tmp != null) {
+                
messageMetadata.setCompression(CompressionType.valueOf(tmp.toString()));
+            }
+            tmp = headers.getFirst(UNCOMPRESSED_SIZE);
+            if (tmp != null) {
+                
messageMetadata.setUncompressedSize(Integer.parseInt(tmp.toString()));
+            }
+            tmp = headers.getFirst(ENCRYPTION_ALGO);
+            if (tmp != null) {
+                messageMetadata.setEncryptionAlgo(tmp.toString());
+            }
+            tmp = headers.getFirst(PARTITION_KEY_B64_ENCODED);
+            if (tmp != null) {
+                
messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString()));
+            }
+            tmp = headers.getFirst(MARKER_TYPE);
+            if (tmp != null) {
+                
messageMetadata.setMarkerType(Integer.parseInt(tmp.toString()));
+            }
+            tmp = headers.getFirst(TXNID_LEAST_BITS);
+            if (tmp != null) {
+                
messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString()));
+            }
+            tmp = headers.getFirst(TXNID_MOST_BITS);
+            if (tmp != null) {
+                
messageMetadata.setTxnidMostBits(Long.parseLong(tmp.toString()));
+            }
+            tmp = headers.getFirst(HIGHEST_SEQUENCE_ID);
+            if (tmp != null) {
+                
messageMetadata.setHighestSequenceId(Long.parseLong(tmp.toString()));
+            }
+            tmp = headers.getFirst(UUID);
+            if (tmp != null) {
+                messageMetadata.setUuid(tmp.toString());
+            }
+            tmp = headers.getFirst(NUM_CHUNKS_FROM_MSG);
+            if (tmp != null) {
+                
messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp.toString()));
+            }
+            tmp = headers.getFirst(TOTAL_CHUNK_MSG_SIZE);
+            if (tmp != null) {
+                
messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp.toString()));
+            }
+            tmp = headers.getFirst(CHUNK_ID);
+            if (tmp != null) {
+                messageMetadata.setChunkId(Integer.parseInt(tmp.toString()));
+            }
+            tmp = headers.getFirst(NULL_PARTITION_KEY);
+            if (tmp != null) {
+                
messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp.toString()));
+            }
+            tmp = headers.getFirst(ENCRYPTION_PARAM);
+            if (tmp != null) {
+                
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+            }
+            tmp = headers.getFirst(ORDERING_KEY);
+            if (tmp != null) {
+                
messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp.toString()));
+            }
+            tmp = headers.getFirst(SCHEMA_VERSION);
+            if (tmp != null) {
+                
messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp.toString()));
+            }
+            tmp = headers.getFirst(ENCRYPTION_PARAM);
+            if (tmp != null) {
+                
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+            }
+            List<Object> tmpList = headers.get(REPLICATED_TO);
+            if (tmpList != null) {
+                for (Object o : tmpList) {
+                    messageMetadata.addReplicateTo(o.toString());
+                }
+            }
+            tmpList = headers.get(ENCRYPTION_KEYS);
+            if (tmpList != null) {
+                for (Object o : tmpList) {
+                    EncryptionKeys encryptionKey = 
messageMetadata.addEncryptionKey();
+                    
encryptionKey.parseFrom(Base64.getDecoder().decode(o.toString()));
+                }
+            }
+
             tmp = headers.getFirst(BATCH_SIZE_HEADER);
             if (tmp != null) {
                 properties.put(BATCH_SIZE_HEADER, (String) tmp);
diff --git a/site2/docs/admin-api-topics.md b/site2/docs/admin-api-topics.md
index 8904cd1..bf46e86 100644
--- a/site2/docs/admin-api-topics.md
+++ b/site2/docs/admin-api-topics.md
@@ -688,6 +688,30 @@ admin.topics().getMessageById(topic, ledgerId, entryId);
 
 <!--END_DOCUSAURUS_CODE_TABS-->
 
+### Examine messages
+
+You can examine a specific message on a topic by position relative to the 
earliest or the latest message.
+
+<!--DOCUSAURUS_CODE_TABS-->
+<!--pulsar-admin-->
+```shell
+./bin/pulsar-admin topics examine-messages \
+  persistent://public/default/my-topic \
+  -i latest -m 1
+```
+
+<!--REST API-->
+{@inject: 
endpoint|GET|/admin/v2/:schema/:tenant/:namespace/:topic/examinemessage?initialPosition=:initialPosition&messagePosition=:messagePosition|operation/examineMessage?version=[[pulsar:version_number]]}
+
+<!--Java-->
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+admin.topics().examineMessage(topic, "latest", 1);
+```
+
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
 ### Get message ID 
 
 You can get message ID published at or just after the given datetime.

Reply via email to