This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 66baa8dfac9 [improve][cli] PIP-353: Improve transaction message 
visibility for peek-message (#22762)
66baa8dfac9 is described below

commit 66baa8dfac91795cca030390d67c90a7ae43d0a8
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Tue May 28 12:40:52 2024 +0800

    [improve][cli] PIP-353: Improve transaction message visibility for 
peek-message (#22762)
    
    (cherry picked from commit 20e83b96c3fcf10010977ab785093e105e4e40d8)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  30 +++--
 .../broker/admin/v3/AdminApiTransactionTest.java   | 126 +++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  60 +++++++++-
 .../pulsar/client/admin/internal/TopicsImpl.java   |  70 +++++++++---
 .../client/api/TransactionIsolationLevel.java      |  31 +++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   4 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  22 +++-
 7 files changed, 316 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 682f41dcdb6..3461eb2d140 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -97,6 +97,7 @@ import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedExc
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -2707,7 +2708,7 @@ public class PersistentTopicsBase extends AdminResource {
                         @Override
                         public void readEntryComplete(Entry entry, Object ctx) 
{
                             try {
-                                
results.complete(generateResponseWithEntry(entry));
+                                
results.complete(generateResponseWithEntry(entry, (PersistentTopic) topic));
                             } catch (IOException exception) {
                                 throw new RestException(exception);
                             } finally {
@@ -2848,10 +2849,12 @@ public class PersistentTopicsBase extends AdminResource 
{
                     entry = sub.peekNthMessage(messagePosition);
                 }
             }
-            return entry;
-        }).thenCompose(entry -> {
+            return entry.thenApply(e -> Pair.of(e, (PersistentTopic) topic));
+        }).thenCompose(entryTopicPair -> {
+            Entry entry = entryTopicPair.getLeft();
+            PersistentTopic persistentTopic = entryTopicPair.getRight();
             try {
-                Response response = generateResponseWithEntry(entry);
+                Response response = generateResponseWithEntry(entry, 
persistentTopic);
                 return CompletableFuture.completedFuture(response);
             } catch (NullPointerException npe) {
                 throw new RestException(Status.NOT_FOUND, "Message not found");
@@ -2930,17 +2933,18 @@ public class PersistentTopicsBase extends AdminResource 
{
                                 PersistentTopicsBase.this.topicName);
                     }
                 }, null);
-                return future;
+                return future.thenApply(entry -> Pair.of(entry, 
(PersistentTopic) topic));
             } catch (ManagedLedgerException exception) {
                 log.error("[{}] Failed to examine message at position {} from 
{} due to {}", clientAppId(),
                         messagePosition,
                         topicName, exception);
                 throw new RestException(exception);
             }
-
-        }).thenApply(entry -> {
+        }).thenApply(entryTopicPair -> {
+            Entry entry = entryTopicPair.getLeft();
+            PersistentTopic persistentTopic = entryTopicPair.getRight();
             try {
-                return generateResponseWithEntry(entry);
+                return generateResponseWithEntry(entry, persistentTopic);
             } catch (IOException exception) {
                 throw new RestException(exception);
             } finally {
@@ -2951,7 +2955,7 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
-    private Response generateResponseWithEntry(Entry entry) throws IOException 
{
+    private Response generateResponseWithEntry(Entry entry, PersistentTopic 
persistentTopic) throws IOException {
         checkNotNull(entry);
         PositionImpl pos = (PositionImpl) entry.getPosition();
         ByteBuf metadataAndPayload = entry.getDataBuffer();
@@ -3069,6 +3073,14 @@ public class PersistentTopicsBase extends AdminResource {
         if (metadata.hasNullPartitionKey()) {
             responseBuilder.header("X-Pulsar-null-partition-key", 
metadata.isNullPartitionKey());
         }
+        if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()) {
+            TxnID txnID = new TxnID(metadata.getTxnidMostBits(), 
metadata.getTxnidLeastBits());
+            boolean isTxnAborted = persistentTopic.isTxnAborted(txnID, 
(PositionImpl) entry.getPosition());
+            responseBuilder.header("X-Pulsar-txn-aborted", isTxnAborted);
+        }
+        boolean isTxnUncommitted = ((PositionImpl) entry.getPosition())
+                .compareTo(persistentTopic.getMaxReadPosition()) > 0;
+        responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);
 
         // Decode if needed
         CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index adf810945de..5a192d0159a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.http.HttpStatus;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
@@ -48,12 +49,16 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TransactionIsolationLevel;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.common.api.proto.MarkerType;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -917,6 +922,127 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testPeekMessageForSkipTxnMarker() throws Exception {
+        initTransaction(1);
+
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/peek_marker");
+
+        @Cleanup
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        int n = 10;
+        for (int i = 0; i < n; i++) {
+            Transaction txn = pulsarClient.newTransaction().build().get();
+            producer.newMessage(txn).value("msg").send();
+            txn.commit().get();
+        }
+
+        List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, 
"t-sub", n,
+                false, TransactionIsolationLevel.READ_UNCOMMITTED);
+        assertEquals(peekMsgs.size(), n);
+        for (Message<byte[]> peekMsg : peekMsgs) {
+            assertEquals(new String(peekMsg.getValue()), "msg");
+        }
+    }
+
+    @Test
+    public void testPeekMessageFoReadCommittedMessages() throws Exception {
+        initTransaction(1);
+
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/peek_txn");
+
+        @Cleanup
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        int n = 10;
+        // Alternately sends `n` committed transactional messages and `n` 
abort transactional messages.
+        for (int i = 0; i < 2 * n; i++) {
+            Transaction txn = pulsarClient.newTransaction().build().get();
+            if (i % 2 == 0) {
+                producer.newMessage(txn).value("msg").send();
+                txn.commit().get();
+            } else {
+                producer.newMessage(txn).value("msg-aborted").send();
+                txn.abort();
+            } 
+        }
+        // Then sends 1 uncommitted transactional messages.
+        Transaction txn = pulsarClient.newTransaction().build().get();
+        producer.newMessage(txn).value("msg-uncommitted").send();
+        // Then sends n-1 no transaction messages.
+        for (int i = 0; i < n - 1; i++) {
+            producer.newMessage().value("msg-after-uncommitted").send();
+        }
+        
+        // peek n message, all messages value should be "msg"
+        {
+            List<Message<byte[]>> peekMsgs = 
admin.topics().peekMessages(topic, "t-sub", n,
+                    false, TransactionIsolationLevel.READ_COMMITTED);
+            assertEquals(peekMsgs.size(), n);
+            for (Message<byte[]> peekMsg : peekMsgs) {
+                assertEquals(new String(peekMsg.getValue()), "msg");
+            }
+        }
+
+        // peek 3 * n message, and still get n message, all messages value 
should be "msg"
+        {
+            List<Message<byte[]>> peekMsgs = 
admin.topics().peekMessages(topic, "t-sub", 2 * n,
+                    false, TransactionIsolationLevel.READ_COMMITTED);
+            assertEquals(peekMsgs.size(), n);
+            for (Message<byte[]> peekMsg : peekMsgs) {
+                assertEquals(new String(peekMsg.getValue()), "msg");
+            }
+        }
+    }
+
+    @Test
+    public void testPeekMessageForShowAllMessages() throws Exception {
+        initTransaction(1);
+
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/peek_all");
+
+        @Cleanup
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        int n = 10;
+        // Alternately sends `n` committed transactional messages and `n` 
abort transactional messages.
+        for (int i = 0; i < 2 * n; i++) {
+            Transaction txn = pulsarClient.newTransaction().build().get();
+            if (i % 2 == 0) {
+                producer.newMessage(txn).value("msg").send();
+                txn.commit().get();
+            } else {
+                producer.newMessage(txn).value("msg-aborted").send();
+                txn.abort();
+            }
+        }
+        // Then sends `n` uncommitted transactional messages.
+        Transaction txn = pulsarClient.newTransaction().build().get();
+        for (int i = 0; i < n; i++) {
+            producer.newMessage(txn).value("msg-uncommitted").send();
+        }
+
+        // peek 5 * n message, will get 5 * n msg.
+        List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, 
"t-sub", 5 * n,
+                true, TransactionIsolationLevel.READ_UNCOMMITTED);
+        assertEquals(peekMsgs.size(), 5 * n);
+
+        for (int i = 0; i < 4 * n; i++) {
+            Message<byte[]> peekMsg = peekMsgs.get(i);
+            MessageImpl peekMsgImpl = (MessageImpl) peekMsg;
+            MessageMetadata metadata = peekMsgImpl.getMessageBuilder();
+            if (metadata.hasMarkerType()) {
+                assertTrue(metadata.getMarkerType() == 
MarkerType.TXN_COMMIT_VALUE ||
+                        metadata.getMarkerType() == 
MarkerType.TXN_ABORT_VALUE);
+            } else {
+                String value = new String(peekMsg.getValue());
+                assertTrue(value.equals("msg") || value.equals("msg-aborted"));
+            } 
+        }
+        for (int i = 4 * n; i < peekMsgs.size(); i++) {
+            Message<byte[]> peekMsg = peekMsgs.get(i);
+            assertEquals(new String(peekMsg.getValue()), "msg-uncommitted"); 
+        }
+    }
+
     private static void verifyCoordinatorStats(String state,
                                                long sequenceId, long 
lowWaterMark) {
         assertEquals(state, "Ready");
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 574b859e82c..c681bd1a7bc 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -31,6 +31,7 @@ import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedExc
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TransactionIsolationLevel;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -1653,7 +1654,53 @@ public interface Topics {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    List<Message<byte[]>> peekMessages(String topic, String subName, int 
numMessages) throws PulsarAdminException;
+    default List<Message<byte[]>> peekMessages(String topic, String subName, 
int numMessages)
+            throws PulsarAdminException {
+        return peekMessages(topic, subName, numMessages, false, 
TransactionIsolationLevel.READ_COMMITTED);
+    }
+
+    /**
+     * Peek messages from a topic subscription.
+     *
+     * @param topic
+     *            topic name
+     * @param subName
+     *            Subscription name
+     * @param numMessages
+     *            Number of messages
+     * @param showServerMarker
+     *            Enables the display of internal server write markers
+     * @param transactionIsolationLevel
+     *            Sets the isolation level for peeking messages within 
transactions.
+     *            - 'READ_COMMITTED' allows peeking only committed 
transactional messages.
+     *            - 'READ_UNCOMMITTED' allows peeking all messages,
+     *                                 even transactional messages which have 
been aborted.
+     * @return
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic or subscription does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    List<Message<byte[]>> peekMessages(String topic, String subName, int 
numMessages,
+                                       boolean showServerMarker, 
TransactionIsolationLevel transactionIsolationLevel)
+            throws PulsarAdminException;
+
+    /**
+     * Peek messages from a topic subscription asynchronously.
+     *
+     * @param topic
+     *            topic name
+     * @param subName
+     *            Subscription name
+     * @param numMessages
+     *            Number of messages
+     * @return a future that can be used to track when the messages are 
returned
+     */
+    default CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String 
topic, String subName, int numMessages) {
+        return peekMessagesAsync(topic, subName, numMessages, false, 
TransactionIsolationLevel.READ_COMMITTED);
+    }
 
     /**
      * Peek messages from a topic subscription asynchronously.
@@ -1664,9 +1711,18 @@ public interface Topics {
      *            Subscription name
      * @param numMessages
      *            Number of messages
+     * @param showServerMarker
+     *            Enables the display of internal server write markers
+      @param transactionIsolationLevel
+     *            Sets the isolation level for peeking messages within 
transactions.
+     *            - 'READ_COMMITTED' allows peeking only committed 
transactional messages.
+     *            - 'READ_UNCOMMITTED' allows peeking all messages,
+     *                                 even transactional messages which have 
been aborted.
      * @return a future that can be used to track when the messages are 
returned
      */
-    CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, 
String subName, int numMessages);
+    CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(
+            String topic, String subName, int numMessages,
+            boolean showServerMarker, TransactionIsolationLevel 
transactionIsolationLevel);
 
     /**
      * Get a message by its messageId via a topic subscription.
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index f76cfbcde98..b7a8b876640 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TransactionIsolationLevel;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
@@ -130,6 +131,8 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     private static final String SCHEMA_VERSION = 
"X-Pulsar-Base64-schema-version-b64encoded";
     private static final String ENCRYPTION_PARAM = 
"X-Pulsar-Base64-encryption-param";
     private static final String ENCRYPTION_KEYS = 
"X-Pulsar-Base64-encryption-keys";
+    public static final String TXN_ABORTED = "X-Pulsar-txn-aborted";
+    public static final String TXN_UNCOMMITTED = "X-Pulsar-txn-uncommitted";
     // CHECKSTYLE.ON: MemberName
 
     public static final String PROPERTY_SHADOW_SOURCE_KEY = 
"PULSAR.SHADOW_SOURCE";
@@ -867,7 +870,9 @@ public class TopicsImpl extends BaseResource implements 
Topics {
         return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
     }
 
-    private CompletableFuture<List<Message<byte[]>>> peekNthMessage(String 
topic, String subName, int messagePosition) {
+    private CompletableFuture<List<Message<byte[]>>> peekNthMessage(
+            String topic, String subName, int messagePosition, boolean 
showServerMarker,
+            TransactionIsolationLevel transactionIsolationLevel) {
         TopicName tn = validateTopic(topic);
         String encodedSubName = Codec.encode(subName);
         WebTarget path = topicPath(tn, "subscription", encodedSubName,
@@ -879,7 +884,8 @@ public class TopicsImpl extends BaseResource implements 
Topics {
                     @Override
                     public void completed(Response response) {
                         try {
-                            
future.complete(getMessagesFromHttpResponse(tn.toString(), response));
+                            
future.complete(getMessagesFromHttpResponse(tn.toString(), response,
+                                    showServerMarker, 
transactionIsolationLevel));
                         } catch (Exception e) {
                             future.completeExceptionally(getApiException(e));
                         }
@@ -894,28 +900,35 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
-    public List<Message<byte[]>> peekMessages(String topic, String subName, 
int numMessages)
+    public List<Message<byte[]>> peekMessages(String topic, String subName, 
int numMessages,
+                                              boolean showServerMarker,
+                                              TransactionIsolationLevel 
transactionIsolationLevel)
             throws PulsarAdminException {
-        return sync(() -> peekMessagesAsync(topic, subName, numMessages));
+        return sync(() -> peekMessagesAsync(topic, subName, numMessages, 
showServerMarker, transactionIsolationLevel));
     }
 
     @Override
-    public CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String 
topic, String subName, int numMessages) {
+    public CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(
+            String topic, String subName, int numMessages,
+            boolean showServerMarker, TransactionIsolationLevel 
transactionIsolationLevel) {
         checkArgument(numMessages > 0);
         CompletableFuture<List<Message<byte[]>>> future = new 
CompletableFuture<List<Message<byte[]>>>();
-        peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), 
future, 1);
+        peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(),
+                future, 1, showServerMarker, transactionIsolationLevel);
         return future;
     }
 
     private void peekMessagesAsync(String topic, String subName, int 
numMessages,
-            List<Message<byte[]>> messages, 
CompletableFuture<List<Message<byte[]>>> future, int nthMessage) {
+            List<Message<byte[]>> messages, 
CompletableFuture<List<Message<byte[]>>> future, int nthMessage,
+            boolean showServerMarker, TransactionIsolationLevel 
transactionIsolationLevel) {
         if (numMessages <= 0) {
             future.complete(messages);
             return;
         }
 
         // if peeking first message succeeds, we know that the topic and 
subscription exists
-        peekNthMessage(topic, subName, nthMessage).handle((r, ex) -> {
+        peekNthMessage(topic, subName, nthMessage, showServerMarker, 
transactionIsolationLevel)
+                .handle((r, ex) -> {
             if (ex != null) {
                 // if we get a not found exception, it means that the position 
for the message we are trying to get
                 // does not exist. At this point, we can return the already 
found messages.
@@ -930,7 +943,8 @@ public class TopicsImpl extends BaseResource implements 
Topics {
             for (int i = 0; i < Math.min(r.size(), numMessages); i++) {
                 messages.add(r.get(i));
             }
-            peekMessagesAsync(topic, subName, numMessages - r.size(), 
messages, future, nthMessage + 1);
+            peekMessagesAsync(topic, subName, numMessages - r.size(), 
messages, future,
+                    nthMessage + 1, showServerMarker, 
transactionIsolationLevel);
             return null;
         });
     }
@@ -1253,6 +1267,13 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, 
Response response) throws Exception {
+        return getMessagesFromHttpResponse(topic, response, true,
+                TransactionIsolationLevel.READ_UNCOMMITTED);
+    }
+
+    private List<Message<byte[]>> getMessagesFromHttpResponse(
+            String topic, Response response, boolean showServerMarker,
+            TransactionIsolationLevel transactionIsolationLevel) throws 
Exception {
 
         if (response.getStatus() != Status.OK.getStatusCode()) {
             throw getApiException(response);
@@ -1284,7 +1305,32 @@ public class TopicsImpl extends BaseResource implements 
Topics {
 
             Map<String, String> properties = new TreeMap<>();
             MultivaluedMap<String, Object> headers = response.getHeaders();
-            Object tmp = headers.getFirst(PUBLISH_TIME);
+            Object tmp = headers.getFirst(MARKER_TYPE);
+            if (tmp != null) {
+                if (!showServerMarker) {
+                    return new ArrayList<>();
+                } else {
+                    
messageMetadata.setMarkerType(Integer.parseInt(tmp.toString()));
+                }
+            }
+
+            tmp = headers.getFirst(TXN_ABORTED);
+            if (tmp != null && Boolean.parseBoolean(tmp.toString())) {
+                properties.put(TXN_ABORTED, tmp.toString());
+                if (transactionIsolationLevel == 
TransactionIsolationLevel.READ_COMMITTED) {
+                    return new ArrayList<>();
+                }
+            }
+
+            tmp = headers.getFirst(TXN_UNCOMMITTED);
+            if (tmp != null && Boolean.parseBoolean(tmp.toString())) {
+                properties.put(TXN_UNCOMMITTED, tmp.toString());
+                if (transactionIsolationLevel == 
TransactionIsolationLevel.READ_COMMITTED) {
+                    return new ArrayList<>();
+                }
+            }
+
+            tmp = headers.getFirst(PUBLISH_TIME);
             if (tmp != null) {
                 
messageMetadata.setPublishTime(DateFormatter.parse(tmp.toString()));
             }
@@ -1336,10 +1382,6 @@ public class TopicsImpl extends BaseResource implements 
Topics {
             if (tmp != null) {
                 
messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString()));
             }
-            tmp = headers.getFirst(MARKER_TYPE);
-            if (tmp != null) {
-                
messageMetadata.setMarkerType(Integer.parseInt(tmp.toString()));
-            }
             tmp = headers.getFirst(TXNID_LEAST_BITS);
             if (tmp != null) {
                 
messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString()));
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TransactionIsolationLevel.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TransactionIsolationLevel.java
new file mode 100644
index 00000000000..ae385b20232
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TransactionIsolationLevel.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public enum TransactionIsolationLevel {
+    // Consumer can only consume all transactional messages which have been 
committed.
+    READ_COMMITTED,
+    // Consumer can consume all messages, even transactional messages which 
have been aborted.
+    READ_UNCOMMITTED;
+}
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index fd1bdf47998..a3b1fa075cf 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -81,6 +81,7 @@ import org.apache.pulsar.client.admin.Transactions;
 import org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl;
 import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.TransactionIsolationLevel;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -1744,7 +1745,8 @@ public class PulsarAdminToolTest {
         
verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1", 
true);
 
         cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 
-s sub1 -n 3"));
-        verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", 
"sub1", 3);
+        verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", 
"sub1", 3,
+                false, TransactionIsolationLevel.READ_COMMITTED);
 
         MessageImpl message = mock(MessageImpl.class);
         when(message.getData()).thenReturn(new byte[]{});
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index e1e85c68f7e..261bd81a5b7 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -62,9 +62,12 @@ import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TransactionIsolationLevel;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.MarkerType;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -1097,10 +1100,23 @@ public class CmdTopics extends CmdBase {
         @Option(names = { "-n", "--count" }, description = "Number of messages 
(default 1)", required = false)
         private int numMessages = 1;
 
+        @Option(names = { "-ssm", "--show-server-marker" },
+                description = "Enables the display of internal server write 
markers.", required = false)
+        private boolean showServerMarker = false;
+
+        @Option(names = { "-til", "--transaction-isolation-level" },
+                description = "Sets the isolation level for peeking messages 
within transactions. "
+                   + "'READ_COMMITTED' allows peeking only committed 
transactional messages. "
+                   + "'READ_UNCOMMITTED' allows peeking all messages, "
+                        + "even transactional messages which have been 
aborted.",
+                required = false)
+        private TransactionIsolationLevel transactionIsolationLevel = 
TransactionIsolationLevel.READ_COMMITTED;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(topicName);
-            List<Message<byte[]>> messages = 
getTopics().peekMessages(persistentTopic, subName, numMessages);
+            List<Message<byte[]>> messages = 
getTopics().peekMessages(persistentTopic, subName, numMessages,
+                    showServerMarker, transactionIsolationLevel);
             int position = 0;
             for (Message<byte[]> msg : messages) {
                 MessageImpl message = (MessageImpl) msg;
@@ -1122,6 +1138,10 @@ public class CmdTopics extends CmdBase {
                 if (message.getDeliverAtTime() != 0) {
                     System.out.println("Deliver at time: " + 
message.getDeliverAtTime());
                 }
+                MessageMetadata msgMetaData = message.getMessageBuilder();
+                if (showServerMarker && msgMetaData.hasMarkerType()) {
+                    System.out.println("Marker Type: " + 
MarkerType.valueOf(msgMetaData.getMarkerType()));
+                }
 
                 if (message.getBrokerEntryMetadata() != null) {
                     if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) 
{

Reply via email to