This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ebfef6de5e2 [optimize][admin]Enhancing Transaction Buffer Stats and 
Introducing TransactionBufferInternalStats API (#20330)
ebfef6de5e2 is described below

commit ebfef6de5e2026aa2226a4776dba07df0897df28
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat Jul 22 19:06:04 2023 +0800

    [optimize][admin]Enhancing Transaction Buffer Stats and Introducing 
TransactionBufferInternalStats API (#20330)
    
    master https://github.com/apache/pulsar/issues/20291
    
    ### Motivation
    
    Our primary goal is to improve the visibility and troubleshooting 
capabilities of the Pulsar Transaction Buffer by providing more detailed 
information about the snapshot stats and system topic internal status.
    ### Modifications
    
    1. Enhance the existing TransactionBufferStats by adding information about 
snapshot stats, including the capital of the current segment, unseal aborted 
transaction ID size, and other related data. This will provide better 
visibility and troubleshooting capabilities for the Pulsar Transaction Buffer.
    2. Introduce a new API for obtaining TransactionBufferInternalStats, 
allowing users to access the state of the system topic used for storing 
snapshots. This will facilitate problem investigation and resolution when 
issues arise with the transaction buffer.
---
 .../pulsar/broker/admin/impl/TransactionsBase.java |  69 ++++++++++++-
 .../pulsar/broker/admin/v3/Transactions.java       |  68 ++++++++++---
 .../broker/service/persistent/PersistentTopic.java |   6 +-
 .../transaction/buffer/AbortedTxnProcessor.java    |  11 ++-
 .../transaction/buffer/TransactionBuffer.java      |  17 ++++
 .../buffer/impl/InMemTransactionBuffer.java        |  14 ++-
 .../SingleSnapshotAbortedTxnProcessorImpl.java     |   8 +-
 .../SnapshotSegmentAbortedTxnProcessorImpl.java    |  28 +++++-
 .../buffer/impl/TopicTransactionBuffer.java        |  21 +++-
 .../buffer/impl/TransactionBufferDisable.java      |  11 +++
 .../v3/AdminApiTransactionMultiBrokerTest.java     |  46 +++++++++
 .../broker/admin/v3/AdminApiTransactionTest.java   | 107 ++++++++++++++++++++-
 .../SegmentAbortedTxnProcessorTest.java            |  88 ++++++++++++++++-
 .../pulsar/broker/transaction/TransactionTest.java |   4 +-
 .../broker/transaction/TransactionTestBase.java    |   3 +-
 .../buffer/TransactionStablePositionTest.java      |   4 +-
 .../apache/pulsar/client/admin/Transactions.java   |  63 ++++++++++--
 ...ansactionBufferStats.java => SegmentStats.java} |  33 ++-----
 ...nsactionBufferStats.java => SegmentsStats.java} |  34 +++----
 ....java => SnapshotSystemTopicInternalStats.java} |  30 +-----
 ...ts.java => TransactionBufferInternalStats.java} |  32 ++----
 .../policies/data/TransactionBufferStats.java      |   9 ++
 .../client/admin/internal/TransactionsImpl.java    |  26 ++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   6 +-
 .../apache/pulsar/admin/cli/CmdTransactions.java   |  22 ++++-
 25 files changed, 619 insertions(+), 141 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index d596cbdd39d..3921334cff3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
 import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED;
 import static javax.ws.rs.core.Response.Status.NOT_FOUND;
 import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
@@ -38,6 +39,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.Transactions;
@@ -47,6 +49,8 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.SnapshotSystemTopicInternalStats;
+import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
 import 
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
@@ -170,9 +174,10 @@ public abstract class TransactionsBase extends 
AdminResource {
     }
 
     protected CompletableFuture<TransactionBufferStats> 
internalGetTransactionBufferStats(boolean authoritative,
-                                                                               
           boolean lowWaterMarks) {
+                                                                               
           boolean lowWaterMarks,
+                                                                               
           boolean segmentStats) {
         return getExistingPersistentTopicAsync(authoritative)
-                .thenApply(topic -> 
topic.getTransactionBufferStats(lowWaterMarks));
+                .thenApply(topic -> 
topic.getTransactionBufferStats(lowWaterMarks, segmentStats));
     }
 
     protected CompletableFuture<TransactionPendingAckStats> 
internalGetPendingAckStats(
@@ -431,6 +436,66 @@ public abstract class TransactionsBase extends 
AdminResource {
                 );
     }
 
+    protected CompletableFuture<TransactionBufferInternalStats> 
internalGetTransactionBufferInternalStats(
+            boolean authoritative, boolean metadata) {
+        TransactionBufferInternalStats transactionBufferInternalStats = new 
TransactionBufferInternalStats();
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenCompose(topic -> {
+                    AbortedTxnProcessor.SnapshotType snapshotType = 
topic.getTransactionBuffer().getSnapshotType();
+                    if (snapshotType == null) {
+                        return FutureUtil.failedFuture(new 
RestException(NOT_FOUND,
+                                "Transaction buffer Snapshot for the topic 
does not exist"));
+                    } else if (snapshotType == 
AbortedTxnProcessor.SnapshotType.Segment) {
+                        transactionBufferInternalStats.snapshotType = 
snapshotType.toString();
+                        TopicName segmentTopic = 
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+                                
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+                        CompletableFuture<SnapshotSystemTopicInternalStats> 
segmentInternalStatsFuture =
+                                getTxnSnapshotInternalStats(segmentTopic, 
metadata);
+                        TopicName indexTopic = 
TopicName.get(TopicDomain.persistent.toString(),
+                                namespaceName,
+                                
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+                        CompletableFuture<SnapshotSystemTopicInternalStats> 
segmentIndexInternalStatsFuture =
+                                getTxnSnapshotInternalStats(indexTopic, 
metadata);
+                        return segmentIndexInternalStatsFuture
+                                .thenCombine(segmentInternalStatsFuture, 
(indexStats, segmentStats) -> {
+                                    
transactionBufferInternalStats.segmentIndexInternalStats = indexStats;
+                                    
transactionBufferInternalStats.segmentInternalStats = segmentStats;
+                                    return transactionBufferInternalStats;
+                                });
+                    } else if (snapshotType == 
AbortedTxnProcessor.SnapshotType.Single) {
+                        transactionBufferInternalStats.snapshotType = 
snapshotType.toString();
+                        TopicName singleSnapshotTopic = 
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+                                SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+                        return 
getTxnSnapshotInternalStats(singleSnapshotTopic, metadata)
+                                .thenApply(snapshotSystemTopicInternalStats -> 
{
+                                   
transactionBufferInternalStats.singleSnapshotSystemTopicInternalStats =
+                                           snapshotSystemTopicInternalStats;
+                                   return transactionBufferInternalStats;
+                                });
+                    }
+                    return FutureUtil.failedFuture(new 
RestException(INTERNAL_SERVER_ERROR, "Unknown SnapshotType "
+                            + snapshotType));
+                });
+    }
+
+    private CompletableFuture<SnapshotSystemTopicInternalStats> 
getTxnSnapshotInternalStats(TopicName topicName,
+                                                                               
             boolean metadata) {
+        final PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException e) {
+            return FutureUtil.failedFuture(new RestException(e));
+        }
+        return admin.topics().getInternalStatsAsync(topicName.toString(), 
metadata)
+                        .thenApply(persistentTopicInternalStats -> {
+                            SnapshotSystemTopicInternalStats
+                                    snapshotSystemTopicInternalStats = new 
SnapshotSystemTopicInternalStats();
+                            
snapshotSystemTopicInternalStats.managedLedgerInternalStats = 
persistentTopicInternalStats;
+                            snapshotSystemTopicInternalStats.managedLedgerName 
= topicName.getEncodedLocalName();
+                            return snapshotSystemTopicInternalStats;
+                        });
+    }
+
     protected CompletableFuture<PersistentTopic> 
getExistingPersistentTopicAsync(boolean authoritative) {
         return validateTopicOwnershipAsync(topicName, 
authoritative).thenCompose(__ -> {
             CompletableFuture<Optional<Topic>> topicFuture = 
pulsar().getBrokerService()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index aa24dbdcc3a..1bdc2255085 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.broker.admin.impl.TransactionsBase;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.jetbrains.annotations.Nullable;
 
 @Path("/transactions")
 @Produces(MediaType.APPLICATION_JSON)
@@ -171,11 +172,13 @@ public class Transactions extends TransactionsBase {
                                           @PathParam("namespace") String 
namespace,
                                           @PathParam("topic") @Encoded String 
encodedTopic,
                                           @QueryParam("lowWaterMarks") 
@DefaultValue("false")
-                                                      boolean lowWaterMarks) {
+                                                      boolean lowWaterMarks,
+                                          @QueryParam("segmentStats") 
@DefaultValue("false")
+                                              boolean segmentStats) {
         try {
             checkTransactionCoordinatorEnabled();
             validateTopicName(tenant, namespace, encodedTopic);
-            internalGetTransactionBufferStats(authoritative, lowWaterMarks)
+            internalGetTransactionBufferStats(authoritative, lowWaterMarks, 
segmentStats)
                     .thenAccept(asyncResponse::resume)
                     .exceptionally(ex -> {
                         if (!isRedirectException(ex)) {
@@ -315,17 +318,58 @@ public class Transactions extends TransactionsBase {
                             log.error("[{}] Failed to get pending ack internal 
stats {}",
                                     clientAppId(), topicName, ex);
                         }
-                        Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
-                        if (cause instanceof 
BrokerServiceException.ServiceUnitNotReadyException) {
-                            asyncResponse.resume(new 
RestException(SERVICE_UNAVAILABLE, cause));
-                        } else if (cause instanceof 
BrokerServiceException.NotAllowedException) {
-                            asyncResponse.resume(new 
RestException(METHOD_NOT_ALLOWED, cause));
-                        } else if (cause instanceof 
BrokerServiceException.SubscriptionNotFoundException) {
-                            asyncResponse.resume(new RestException(NOT_FOUND, 
cause));
-                        } else {
-                            asyncResponse.resume(new RestException(cause));
+                        return 
resumeAsyncResponseWithBrokerException(asyncResponse, ex);
+                    });
+        } catch (Exception ex) {
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+        }
+    }
+
+    @Nullable
+    private Void resumeAsyncResponseWithBrokerException(@Suspended 
AsyncResponse asyncResponse,
+                                                        Throwable ex) {
+        Throwable cause = FutureUtil.unwrapCompletionException(ex);
+        if (cause instanceof 
BrokerServiceException.ServiceUnitNotReadyException) {
+            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, 
cause));
+        } else if (cause instanceof 
BrokerServiceException.NotAllowedException) {
+            asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED, cause));
+        } else if (cause instanceof 
BrokerServiceException.SubscriptionNotFoundException) {
+            asyncResponse.resume(new RestException(NOT_FOUND, cause));
+        } else {
+            asyncResponse.resume(new RestException(cause));
+        }
+        return null;
+    }
+
+    @GET
+    @Path("/transactionBufferInternalStats/{tenant}/{namespace}/{topic}")
+    @ApiOperation(value = "Get transaction buffer internal stats.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic doesn't exist"),
+            @ApiResponse(code = 503, message = "This Broker is not enable 
transaction"),
+            @ApiResponse(code = 307, message = "Topic is not owned by this 
broker!"),
+            @ApiResponse(code = 405, message = "Transaction buffer don't use 
managedLedger!"),
+            @ApiResponse(code = 400, message = "Topic is not a persistent 
topic!"),
+            @ApiResponse(code = 409, message = "Concurrent modification")
+    })
+    public void getTransactionBufferInternalStats(@Suspended final 
AsyncResponse asyncResponse,
+                                                  @QueryParam("authoritative")
+                                                  @DefaultValue("false") 
boolean authoritative,
+                                                  @PathParam("tenant") String 
tenant,
+                                                  @PathParam("namespace") 
String namespace,
+                                                  @PathParam("topic") @Encoded 
String encodedTopic,
+                                                  @QueryParam("metadata") 
@DefaultValue("false") boolean metadata) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGetTransactionBufferInternalStats(authoritative, metadata)
+                    .thenAccept(asyncResponse::resume)
+                    .exceptionally(ex -> {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to get transaction buffer 
internal stats {}",
+                                    clientAppId(), topicName, ex);
                         }
-                        return null;
+                        return 
resumeAsyncResponseWithBrokerException(asyncResponse, ex);
                     });
         } catch (Exception ex) {
             resumeAsyncResponseExceptionally(asyncResponse, ex);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1e055eccc42..e907caa8c30 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3566,7 +3566,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     }
 
     public TransactionBufferStats getTransactionBufferStats(boolean 
lowWaterMarks) {
-        return this.transactionBuffer.getStats(lowWaterMarks);
+        return getTransactionBufferStats(lowWaterMarks, false);
+    }
+
+    public TransactionBufferStats getTransactionBufferStats(boolean 
lowWaterMarks, boolean segmentStats) {
+        return this.transactionBuffer.getStats(lowWaterMarks, segmentStats);
     }
 
     public TransactionPendingAckStats getTransactionPendingAckStats(String 
subName, boolean lowWaterMarks) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
index 8223aa12b75..0f06c201a81 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
@@ -21,10 +21,16 @@ package org.apache.pulsar.broker.transaction.buffer;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 
 
 public interface AbortedTxnProcessor {
 
+    enum SnapshotType {
+        Single,
+        Segment,
+    }
+
     /**
      * After the transaction buffer writes a transaction aborted marker to the 
topic,
      * the transaction buffer will put the aborted txnID and the aborted 
marker position to AbortedTxnProcessor.
@@ -66,9 +72,10 @@ public interface AbortedTxnProcessor {
 
     /**
      * Get the lastSnapshotTimestamps.
-     * @return the lastSnapshotTimestamps.
+     *
+     * @return a transactionBufferStats with the stats in the 
abortedTxnProcessor.
      */
-    long getLastSnapshotTimestamps();
+    TransactionBufferStats generateSnapshotStats(boolean segmentStats);
 
     CompletableFuture<Void> closeAsync();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index 9c32f762996..7eb5d6f789c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -158,12 +158,29 @@ public interface TransactionBuffer {
      */
     PositionImpl getMaxReadPosition();
 
+    /**
+     * Get the snapshot type.
+     *
+     * The snapshot type can be either "Single" or "Segment". In "Single" 
mode, a single snapshot log is used
+     * to record the transaction buffer stats. In "Segment" mode, a snapshot 
segment topic is used to record
+     * the stats, and a separate snapshot segment index topic is used to index 
these stats.
+     *
+     * @return the snapshot type
+     */
+    AbortedTxnProcessor.SnapshotType getSnapshotType();
+
     /**
      * Get transaction in buffer stats.
      * @return the transaction in buffer stats.
      */
     TransactionInBufferStats getTransactionInBufferStats(TxnID txnID);
 
+    /**
+     * Get transaction stats in buffer.
+     * @return the transaction stats in buffer.
+     */
+    TransactionBufferStats getStats(boolean lowWaterMarks, boolean 
segmentStats);
+
     /**
      * Get transaction stats in buffer.
      * @return the transaction stats in buffer.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 56b49f98efe..bc2dd58a581 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
@@ -374,16 +375,27 @@ class InMemTransactionBuffer implements TransactionBuffer 
{
         return PositionImpl.LATEST;
     }
 
+    @Override
+    public AbortedTxnProcessor.SnapshotType getSnapshotType() {
+        return null;
+    }
+
     @Override
     public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
         return null;
     }
 
     @Override
-    public TransactionBufferStats getStats(boolean lowWaterMarks) {
+    public TransactionBufferStats getStats(boolean lowWaterMarks, boolean 
segmentStats) {
         return null;
     }
 
+    @Override
+    public TransactionBufferStats getStats(boolean lowWaterMarks) {
+        return getStats(lowWaterMarks, false);
+    }
+
+
     @Override
     public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
         return CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index 5d582d564ea..967f1f16fef 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
@@ -173,8 +174,11 @@ public class SingleSnapshotAbortedTxnProcessorImpl 
implements AbortedTxnProcesso
     }
 
     @Override
-    public long getLastSnapshotTimestamps() {
-        return this.lastSnapshotTimestamps;
+    public TransactionBufferStats generateSnapshotStats(boolean segmentStats) {
+        TransactionBufferStats transactionBufferStats = new 
TransactionBufferStats();
+        transactionBufferStats.lastSnapshotTimestamps = 
this.lastSnapshotTimestamps;
+        transactionBufferStats.totalAbortedTransactions = aborts.size();
+        return transactionBufferStats;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index cd6c9c6123a..be1271a155c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -62,6 +62,9 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.SegmentStats;
+import org.apache.pulsar.common.policies.data.SegmentsStats;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.FutureUtil;
 
@@ -116,6 +119,8 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
 
     private volatile long lastSnapshotTimestamps;
 
+    private volatile long lastTakedSnapshotSegmentTimestamp;
+
     /**
      * The number of the aborted transaction IDs in a segment.
      * This is calculated according to the configured memory size.
@@ -451,9 +456,25 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
                 persistentWorker::clearSnapshotSegmentAndIndexes);
     }
 
-    @Override
-    public long getLastSnapshotTimestamps() {
-        return this.lastSnapshotTimestamps;
+    public TransactionBufferStats generateSnapshotStats(boolean segmentStats) {
+        TransactionBufferStats transactionBufferStats = new 
TransactionBufferStats();
+        transactionBufferStats.totalAbortedTransactions = this.aborts.size();
+        transactionBufferStats.lastSnapshotTimestamps = 
this.lastSnapshotTimestamps;
+        SegmentsStats segmentsStats = new SegmentsStats();
+        segmentsStats.currentSegmentCapacity = this.snapshotSegmentCapacity;
+        segmentsStats.lastTookSnapshotSegmentTimestamp = 
this.lastTakedSnapshotSegmentTimestamp;
+        segmentsStats.unsealedAbortTxnIDSize = this.unsealedTxnIds.size();
+        segmentsStats.segmentsSize = indexes.size();
+        if (segmentStats) {
+            List<SegmentStats> statsList = new ArrayList<>();
+            segmentIndex.forEach((position, txnID) -> {
+                SegmentStats stats = new SegmentStats(txnID.toString(), 
position.toString());
+                statsList.add(stats);
+            });
+            segmentsStats.segmentStats = statsList;
+        }
+        transactionBufferStats.segmentsStats = segmentsStats;
+        return transactionBufferStats;
     }
 
     @Override
@@ -705,6 +726,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
                 
transactionBufferSnapshotSegment.setSequenceId(this.sequenceID.get());
                 return 
segmentWriter.writeAsync(buildKey(this.sequenceID.get()), 
transactionBufferSnapshotSegment);
             }).thenCompose((messageId) -> {
+                lastTakedSnapshotSegmentTimestamp = System.currentTimeMillis();
                 //Build index for this segment
                 TransactionBufferSnapshotIndex index = new 
TransactionBufferSnapshotIndex();
                 
index.setSequenceID(transactionBufferSnapshotSegment.getSequenceId());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 89a8e95afba..3c13be22086 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -101,6 +101,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
     private final AbortedTxnProcessor snapshotAbortedTxnProcessor;
 
+    private final AbortedTxnProcessor.SnapshotType snapshotType;
+
     public TopicTransactionBuffer(PersistentTopic topic) {
         super(State.None);
         this.topic = topic;
@@ -112,8 +114,10 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         this.maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
         if 
(topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled())
 {
             snapshotAbortedTxnProcessor = new 
SnapshotSegmentAbortedTxnProcessorImpl(topic);
+            snapshotType = AbortedTxnProcessor.SnapshotType.Segment;
         } else {
             snapshotAbortedTxnProcessor = new 
SingleSnapshotAbortedTxnProcessorImpl(topic);
+            snapshotType = AbortedTxnProcessor.SnapshotType.Single;
         }
         this.recover();
     }
@@ -489,6 +493,11 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         }
     }
 
+    @Override
+    public AbortedTxnProcessor.SnapshotType getSnapshotType() {
+        return snapshotType;
+    }
+
     @Override
     public PositionImpl getMaxReadPosition() {
         if (checkIfReady() || checkIfNoSnapshot()) {
@@ -509,9 +518,10 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     }
 
     @Override
-    public TransactionBufferStats getStats(boolean lowWaterMarks) {
-        TransactionBufferStats transactionBufferStats = new 
TransactionBufferStats();
-        transactionBufferStats.lastSnapshotTimestamps = 
this.snapshotAbortedTxnProcessor.getLastSnapshotTimestamps();
+    public TransactionBufferStats getStats(boolean lowWaterMarks, boolean 
segmentStats) {
+        TransactionBufferStats transactionBufferStats = 
this.snapshotAbortedTxnProcessor
+                .generateSnapshotStats(segmentStats);
+        transactionBufferStats.snapshotType = snapshotType.toString();
         transactionBufferStats.state = this.getState().name();
         transactionBufferStats.maxReadPosition = 
this.maxReadPosition.toString();
         if (lowWaterMarks) {
@@ -524,6 +534,11 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         return transactionBufferStats;
     }
 
+    @Override
+    public TransactionBufferStats getStats(boolean lowWaterMarks) {
+        return getStats(lowWaterMarks, false);
+    }
+
     @Override
     public void run(Timeout timeout) {
         if (checkIfReady()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 22ba8e2d2e8..7c74b52951e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
@@ -93,11 +94,21 @@ public class TransactionBufferDisable implements 
TransactionBuffer {
         return PositionImpl.LATEST;
     }
 
+    @Override
+    public AbortedTxnProcessor.SnapshotType getSnapshotType() {
+        return null;
+    }
+
     @Override
     public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
         return null;
     }
 
+    @Override
+    public TransactionBufferStats getStats(boolean lowWaterMarks, boolean 
segmentStats) {
+        return null;
+    }
+
     @Override
     public TransactionBufferStats getStats(boolean lowWaterMarks) {
         return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
index 52aadde7b26..bf51c69fbae 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
@@ -19,13 +19,22 @@
 package org.apache.pulsar.broker.admin.v3;
 
 import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -81,4 +90,41 @@ public class AdminApiTransactionMultiBrokerTest extends 
TransactionTestBase {
             localAdmin.transactions().getCoordinatorInternalStats(i, false);
         }
     }
+
+    @Test
+    public void testGetTransactionBufferInternalStatsInMultiBroker() throws 
Exception {
+        for (int i = 0; i < super.getBrokerCount(); i++) {
+            
getPulsarServiceList().get(i).getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+        }
+        String topic1 = NAMESPACE1 +  
"/testGetTransactionBufferInternalStatsInMultiBroker";
+        assertTrue(admin.namespaces().getBundles(NAMESPACE1).getNumBundles() > 
1);
+        for (int i = 0; true ; i++) {
+            topic1 = topic1 + i;
+            admin.topics().createNonPartitionedTopic(topic1);
+            String segmentTopicBroker = admin.lookups()
+                    .lookupTopic(NAMESPACE1 + "/" + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+            String indexTopicBroker = admin.lookups()
+                    .lookupTopic(NAMESPACE1 + "/" + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+            if (segmentTopicBroker.equals(indexTopicBroker)) {
+                String topicBroker = admin.lookups().lookupTopic(topic1);
+                if (!topicBroker.equals(segmentTopicBroker)) {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer(Schema.BYTES).topic(topic1).create();
+        TransactionBufferInternalStats stats = admin.transactions()
+                .getTransactionBufferInternalStatsAsync(topic1, true).get();
+        assertEquals(stats.snapshotType, 
AbortedTxnProcessor.SnapshotType.Segment.toString());
+        assertNull(stats.singleSnapshotSystemTopicInternalStats);
+        assertNotNull(stats.segmentInternalStats);
+        assertTrue(stats.segmentInternalStats.managedLedgerName
+                
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS));
+        assertNotNull(stats.segmentIndexInternalStats);
+        assertTrue(stats.segmentIndexInternalStats.managedLedgerName
+                
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES));
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 0e51470da75..1e5f4679492 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.http.HttpStatus;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -60,7 +61,9 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
 import 
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
@@ -510,7 +513,7 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
 
         TransactionCoordinatorInternalStats stats = admin.transactions()
                 .getCoordinatorInternalStatsAsync(0, true).get();
-        
verifyManagedLegerInternalStats(stats.transactionLogStats.managedLedgerInternalStats,
 26);
+        
verifyManagedLedgerInternalStats(stats.transactionLogStats.managedLedgerInternalStats,
 26);
         assertEquals(TopicName.get(TopicDomain.persistent.toString(), 
NamespaceName.SYSTEM_NAMESPACE,
                 MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + 
"0").getPersistenceNamingEncoding(),
                 stats.transactionLogStats.managedLedgerName);
@@ -565,7 +568,7 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
                         + subName + 
SystemTopicNames.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
                 stats.pendingAckLogStats.managedLedgerName);
 
-        verifyManagedLegerInternalStats(managedLedgerInternalStats, 16);
+        verifyManagedLedgerInternalStats(managedLedgerInternalStats, 16);
 
         ManagedLedgerInternalStats finalManagedLedgerInternalStats = 
managedLedgerInternalStats;
         managedLedgerInternalStats.cursors.forEach((s, cursorStats) -> {
@@ -584,6 +587,88 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
         assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
     }
 
+    @Test(timeOut = 20000)
+    public void testGetTransactionBufferInternalStats() throws Exception {
+        // Initialize transaction
+        initTransaction(1);
+
+        // Create topics
+        final String topic1 = 
"persistent://public/default/testGetTransactionBufferInternalStats-1";
+        final String topic2 = 
"persistent://public/default/testGetTransactionBufferInternalStats-2";
+        final String topic3 = 
"persistent://public/default/testGetTransactionBufferInternalStats-3";
+        pulsar.getConfig().setTransactionCoordinatorEnabled(false);
+        admin.topics().createNonPartitionedTopic(topic1);
+
+        // Verify NotFoundException when transaction coordinator is disabled
+        try {
+            
admin.transactions().getTransactionBufferInternalStatsAsync(topic1, true).get();
+            fail();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof 
PulsarAdminException.NotFoundException);
+        }
+
+        // Enable transaction coordinator and disable segmented snapshot
+        pulsar.getConfig().setTransactionCoordinatorEnabled(true);
+        pulsar.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+        // Send a message with a transaction and abort it
+        Producer<byte[]> producer = 
pulsarClient.newProducer(Schema.BYTES).topic(topic2).create();
+        TransactionImpl transaction = (TransactionImpl) getTransaction();
+        producer.newMessage(transaction).send();
+        transaction.abort().get();
+
+        // Get transaction buffer internal stats and verify single snapshot 
stats
+        TransactionBufferInternalStats stats = admin.transactions()
+                .getTransactionBufferInternalStatsAsync(topic2, true).get();
+        assertEquals(stats.snapshotType, 
AbortedTxnProcessor.SnapshotType.Single.toString());
+        assertNotNull(stats.singleSnapshotSystemTopicInternalStats);
+
+        // Get managed ledger internal stats for the transaction buffer 
snapshot topic
+        PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(
+                TopicName.get(topic2).getNamespace() + "/" + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+        
verifyManagedLedgerInternalStats(stats.singleSnapshotSystemTopicInternalStats.managedLedgerInternalStats,
+                internalStats);
+        
assertTrue(stats.singleSnapshotSystemTopicInternalStats.managedLedgerName
+                .contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT));
+        assertNull(stats.segmentInternalStats);
+        assertNull(stats.segmentIndexInternalStats);
+
+        // Configure segmented snapshot and set segment size
+        pulsar.getConfig().setTransactionBufferSnapshotSegmentSize(9);
+        pulsar.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+
+        // Send a message with a transaction and abort it
+        producer = 
pulsarClient.newProducer(Schema.BYTES).topic(topic3).create();
+        transaction = (TransactionImpl) getTransaction();
+        producer.newMessage(transaction).send();
+        transaction.abort().get();
+
+        // Get transaction buffer internal stats and verify segmented snapshot 
stats
+        stats = 
admin.transactions().getTransactionBufferInternalStatsAsync(topic3, true).get();
+        assertEquals(stats.snapshotType, 
AbortedTxnProcessor.SnapshotType.Segment.toString());
+        assertNull(stats.singleSnapshotSystemTopicInternalStats);
+        assertNotNull(stats.segmentInternalStats);
+
+        // Get managed ledger internal stats for the transaction buffer 
segments topic
+        internalStats = admin.topics().getInternalStats(
+                TopicName.get(topic2).getNamespace() + "/" +
+                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+        
verifyManagedLedgerInternalStats(stats.segmentInternalStats.managedLedgerInternalStats,
 internalStats);
+        assertTrue(stats.segmentInternalStats.managedLedgerName
+                
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS));
+
+        // Get managed ledger internal stats for the transaction buffer 
indexes topic
+        assertNotNull(stats.segmentIndexInternalStats);
+        internalStats = admin.topics().getInternalStats(
+                TopicName.get(topic2).getNamespace() + "/" +
+                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+        
verifyManagedLedgerInternalStats(stats.segmentIndexInternalStats.managedLedgerInternalStats,
 internalStats);
+        assertTrue(stats.segmentIndexInternalStats.managedLedgerName
+                
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES));
+    }
+
+
+
     @Test(timeOut = 20000)
     public void testTransactionNotEnabled() throws Exception {
         cleanup();
@@ -836,7 +921,7 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
                 .withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
     }
 
-    private static void 
verifyManagedLegerInternalStats(ManagedLedgerInternalStats 
managedLedgerInternalStats,
+    private static void 
verifyManagedLedgerInternalStats(ManagedLedgerInternalStats 
managedLedgerInternalStats,
                                                         long totalSize) {
         assertEquals(managedLedgerInternalStats.entriesAddedCounter, 1);
         assertEquals(managedLedgerInternalStats.numberOfEntries, 1);
@@ -851,4 +936,20 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
         assertNotNull(managedLedgerInternalStats.ledgers.get(0).metadata);
         assertEquals(managedLedgerInternalStats.cursors.size(), 1);
     }
+
+    private static void 
verifyManagedLedgerInternalStats(ManagedLedgerInternalStats internalStats,
+                                                         
ManagedLedgerInternalStats persistentTopicStats) {
+        assertEquals(persistentTopicStats.entriesAddedCounter, 
internalStats.entriesAddedCounter);
+        assertEquals(persistentTopicStats.numberOfEntries, 
internalStats.numberOfEntries);
+        assertEquals(persistentTopicStats.totalSize, internalStats.totalSize);
+        assertEquals(persistentTopicStats.currentLedgerEntries, 
internalStats.currentLedgerEntries);
+        assertEquals(persistentTopicStats.currentLedgerSize, 
internalStats.currentLedgerSize);
+        assertEquals(persistentTopicStats.lastLedgerCreationFailureTimestamp, 
internalStats.lastLedgerCreationFailureTimestamp);
+        assertEquals(persistentTopicStats.waitingCursorsCount, 
internalStats.waitingCursorsCount);
+        assertEquals(persistentTopicStats.pendingAddEntriesCount, 
internalStats.pendingAddEntriesCount);
+        assertEquals(persistentTopicStats.lastConfirmedEntry, 
internalStats.lastConfirmedEntry);
+        assertNotNull(internalStats.ledgers.get(0).metadata);
+        assertEquals(persistentTopicStats.ledgers.size(), 
internalStats.ledgers.size());
+        assertEquals(persistentTopicStats.cursors.size(), 
internalStats.cursors.size());
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
index 9e1c1992477..6e763cb44fb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pulsar.broker.transaction;
 
-import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
 import java.lang.reflect.Field;
 import java.util.LinkedList;
 import java.util.NavigableMap;
@@ -29,6 +31,7 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -47,8 +50,9 @@ import 
org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxn
 import 
org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
-import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Transactions;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.transaction.Transaction;
@@ -56,6 +60,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
@@ -256,6 +261,85 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
         processor.closeAsync().get(5, TimeUnit.SECONDS);
     }
 
+    @Test
+    public void testTxnSegmentStats() throws Exception {
+        // Set up test environment
+        String namespace = TENANT + "/testTxnSegmentStats";
+        String topic = "persistent://" + namespace + "/testTxnSegmentStats";
+        
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 + 
topic.length() + SEGMENT_SIZE * 3);
+
+        // Create necessary resources
+        Transactions transactions = admin.transactions();
+        admin.namespaces().createNamespace(namespace);
+        admin.topics().createNonPartitionedTopic(topic);
+
+        // Prepare topic, producer, and consumer
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        @Cleanup
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("my-sub").subscribe();
+
+        // Record the start time of the test
+        long testStartTime = System.currentTimeMillis();
+
+        Transaction transaction = null;
+        // Send messages with transactions and abort them
+        for (int i = 0; i < SEGMENT_SIZE; i++) {
+            transaction = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.HOURS).build().get();
+            producer.newMessage(transaction).send();
+            transaction.abort().get();
+        }
+
+        Transaction txn = 
pulsarClient.newTransaction().withTransactionTimeout(5, 
TimeUnit.HOURS).build().get();
+        producer.newMessage(txn).send();
+        txn.abort().get();
+
+        // Get the transaction buffer stats without segment stats
+        TransactionBufferStats statsWithoutSegmentStats = transactions
+                .getTransactionBufferStats(topic, false, false);
+        assertNotNull(statsWithoutSegmentStats);
+        assertNotNull(statsWithoutSegmentStats.segmentsStats);
+        assertNull(statsWithoutSegmentStats.segmentsStats.segmentStats);
+        assertEquals(statsWithoutSegmentStats.snapshotType, 
AbortedTxnProcessor.SnapshotType.Segment.toString());
+
+        // Verify the segment stats
+        assertEquals(statsWithoutSegmentStats.segmentsStats.segmentsSize, 1L);
+        
assertEquals(statsWithoutSegmentStats.segmentsStats.unsealedAbortTxnIDSize, 1L);
+        
assertEquals(statsWithoutSegmentStats.segmentsStats.currentSegmentCapacity, 
SEGMENT_SIZE);
+        assertEquals(statsWithoutSegmentStats.totalAbortedTransactions, 
SEGMENT_SIZE + 1);
+        
assertTrue(statsWithoutSegmentStats.segmentsStats.lastTookSnapshotSegmentTimestamp
 >= testStartTime);
+
+        // Get the transaction buffer stats with segment stats
+        TransactionBufferStats statsWithSegmentStats = transactions
+                .getTransactionBufferStats(topic, false, true);
+        assertNotNull(statsWithSegmentStats);
+        assertNotNull(statsWithSegmentStats.segmentsStats.segmentStats);
+
+        // Verify if the segment stats are present when requested
+        assertEquals(statsWithSegmentStats.segmentsStats.segmentStats.size(), 
1);
+        
assertEquals(statsWithSegmentStats.segmentsStats.segmentStats.get(0).lastTxnID,
+                transaction.getTxnID().toString());
+
+        // Verify multiple segments
+        for (int i = 0; i < SEGMENT_SIZE * 3; i++) {
+            transaction = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.HOURS).build().get();
+            producer.newMessage(transaction).send();
+            transaction.abort().get();
+        }
+        statsWithSegmentStats = transactions.getTransactionBufferStats(topic, 
false, true);
+
+        // Verify the segment stats
+        assertEquals(statsWithSegmentStats.segmentsStats.segmentsSize, 4L);
+        
assertEquals(statsWithSegmentStats.segmentsStats.unsealedAbortTxnIDSize, 1L);
+        assertEquals(statsWithSegmentStats.totalAbortedTransactions, 
SEGMENT_SIZE * 4 + 1);
+
+        // Reset the configuration
+        this.pulsarService.getConfig()
+                .setTransactionBufferSnapshotSegmentSize(8 + 
PROCESSOR_TOPIC.length() + SEGMENT_SIZE * 3);
+    }
+
     private void verifySnapshotSegmentsSize(String topic, int size) throws 
Exception {
         SystemTopicClient.Reader<TransactionBufferSnapshotSegment> reader =
                 pulsarService.getTransactionBufferSnapshotServiceFactory()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 4e50401fc11..cf389824794 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -743,7 +743,7 @@ public class TransactionTest extends TransactionTestBase {
 
         TransactionBuffer buffer2 = new 
TopicTransactionBuffer(persistentTopic);
         Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
-                assertEquals(buffer2.getStats(false).state, "Ready"));
+                assertEquals(buffer2.getStats(false, false).state, "Ready"));
         managedCursors.removeCursor("transaction-buffer-sub");
 
         doAnswer(invocation -> {
@@ -755,7 +755,7 @@ public class TransactionTest extends TransactionTestBase {
         managedCursors.add(managedCursor, 
managedCursor.getMarkDeletedPosition());
         TransactionBuffer buffer3 = new 
TopicTransactionBuffer(persistentTopic);
         Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
-                assertEquals(buffer3.getStats(false).state, "Ready"));
+                assertEquals(buffer3.getStats(false, false).state, "Ready"));
         persistentTopic.getInternalStats(false).thenAccept(internalStats -> {
             assertTrue(internalStats.cursors.isEmpty());
         });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index cd0c089ad41..c0300c63b35 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -58,6 +58,7 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
     public static final String CLUSTER_NAME = "test";
 
     @Setter
+    @Getter
     private int brokerCount = 3;
     @Getter
     private final List<ServiceConfiguration> serviceConfigurationList = new 
ArrayList<>();
@@ -116,7 +117,7 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
         createTransactionCoordinatorAssign(numPartitionsOfTC);
         admin.tenants().createTenant(TENANT,
                 new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
+        admin.namespaces().createNamespace(NAMESPACE1, 4);
         if (topic != null) {
             if (numPartitions == 0) {
                 admin.topics().createNonPartitionedTopic(topic);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
index 55d115905a3..7493b25ac1d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
@@ -235,10 +235,10 @@ public class TransactionStablePositionTest extends 
TransactionTestBase {
         Awaitility.await().until(() -> {
             if (clientEnableTransaction) {
                 // recover success, client enable transaction will change to 
Ready State
-                return 
topicTransactionBuffer.getStats(false).state.equals(Ready.name());
+                return topicTransactionBuffer.getStats(false, 
false).state.equals(Ready.name());
             } else {
                 // recover success, client disable transaction will change to 
NoSnapshot State
-                return 
topicTransactionBuffer.getStats(false).state.equals(NoSnapshot.name());
+                return topicTransactionBuffer.getStats(false, 
false).state.equals(NoSnapshot.name());
             }
         });
     }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index 57adf263a57..b0504bee744 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
 import 
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
@@ -139,10 +140,24 @@ public interface Transactions {
      * Get transaction buffer stats.
      *
      * @param topic the topic of getting transaction buffer stats
-     * @param  lowWaterMarks Whether to get information about lowWaterMarks 
stored in transaction pending ack.
+     * @param lowWaterMarks Whether to get information about lowWaterMarks 
stored in transaction pending ack.
+     * @param segmentStats Whether to get segment statistics.
      * @return the future stats of transaction buffer in topic.
      */
-    CompletableFuture<TransactionBufferStats> 
getTransactionBufferStatsAsync(String topic, boolean lowWaterMarks);
+    CompletableFuture<TransactionBufferStats> 
getTransactionBufferStatsAsync(String topic, boolean lowWaterMarks,
+                                                                             
boolean segmentStats);
+
+    /**
+     * Get transaction buffer stats.
+     *
+     * @param topic the topic of getting transaction buffer stats
+     * @param lowWaterMarks Whether to get information about lowWaterMarks 
stored in transaction pending ack.
+     * @return the future stats of transaction buffer in topic.
+     */
+    default CompletableFuture<TransactionBufferStats> 
getTransactionBufferStatsAsync(String topic,
+                                                                               
      boolean lowWaterMarks) {
+        return getTransactionBufferStatsAsync(topic, lowWaterMarks, false);
+    }
 
     /**
      * Get transaction buffer stats.
@@ -151,17 +166,31 @@ public interface Transactions {
      * @return the future stats of transaction buffer in topic.
      */
     default CompletableFuture<TransactionBufferStats> 
getTransactionBufferStatsAsync(String topic) {
-        return getTransactionBufferStatsAsync(topic, false);
+        return getTransactionBufferStatsAsync(topic, false, false);
     }
 
     /**
      * Get transaction buffer stats.
      *
      * @param topic the topic of getting transaction buffer stats
-     * @param  lowWaterMarks Whether to get information about lowWaterMarks 
stored in transaction buffer.
+     * @param lowWaterMarks Whether to get information about lowWaterMarks 
stored in transaction buffer.
+     * @param segmentStats Whether to get segment statistics.
      * @return the stats of transaction buffer in topic.
      */
-    TransactionBufferStats getTransactionBufferStats(String topic, boolean 
lowWaterMarks) throws PulsarAdminException;
+    TransactionBufferStats getTransactionBufferStats(String topic, boolean 
lowWaterMarks,
+                                                     boolean segmentStats) 
throws PulsarAdminException;
+
+    /**
+     * Get transaction buffer stats.
+     *
+     * @param topic the topic of getting transaction buffer stats
+     * @param lowWaterMarks Whether to get information about lowWaterMarks 
stored in transaction buffer.
+     * @return the stats of transaction buffer in topic.
+     */
+    default TransactionBufferStats getTransactionBufferStats(String topic,
+                                                             boolean 
lowWaterMarks) throws PulsarAdminException {
+        return getTransactionBufferStats(topic, lowWaterMarks, false);
+    }
 
     /**
      * Get transaction buffer stats.
@@ -170,7 +199,7 @@ public interface Transactions {
      * @return the stats of transaction buffer in topic.
      */
     default TransactionBufferStats getTransactionBufferStats(String topic) 
throws PulsarAdminException {
-        return getTransactionBufferStats(topic, false);
+        return getTransactionBufferStats(topic, false, false);
     }
 
     /**
@@ -309,6 +338,28 @@ public interface Transactions {
     TransactionPendingAckInternalStats getPendingAckInternalStats(String 
topic, String subName,
                                                                   boolean 
metadata) throws PulsarAdminException;
 
+    /**
+     * Get transaction buffer internal stats asynchronously.
+     *
+     * @param topic the topic to get transaction buffer internal stats from
+     * @param metadata whether to obtain ledger metadata
+     *
+     * @return the future internal stats of transaction buffer
+     */
+    CompletableFuture<TransactionBufferInternalStats> 
getTransactionBufferInternalStatsAsync(String topic,
+                                                                               
              boolean metadata);
+
+    /**
+     * Get transaction buffer internal stats.
+     *
+     * @param topic the topic to get transaction buffer internal stats from
+     * @param metadata whether to obtain ledger metadata
+     *
+     * @return the internal stats of transaction buffer
+     */
+    TransactionBufferInternalStats getTransactionBufferInternalStats(String 
topic,
+                                                                     boolean 
metadata) throws PulsarAdminException;
+
     /**
      * Sets the scale of the transaction coordinators.
      * And currently, we can only support scale-up.
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SegmentStats.java
similarity index 51%
copy from 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
copy to 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SegmentStats.java
index 73d66b8c230..007f4f4d632 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SegmentStats.java
@@ -18,30 +18,15 @@
  */
 package org.apache.pulsar.common.policies.data;
 
-import java.util.Map;
+public class SegmentStats {
+    public String lastTxnID;
+    public String persistentPosition;
 
-public class TransactionBufferStats {
+    public SegmentStats(String lastTxnID, String persistentPosition) {
+        this.lastTxnID = lastTxnID;
+        this.persistentPosition = persistentPosition;
+    }
 
-    /** The state of this transaction buffer. */
-    public String state;
-
-    /** The max read position of this transaction buffer. */
-    public String maxReadPosition;
-
-    /** The last snapshot timestamps of this transaction buffer. */
-    public long lastSnapshotTimestamps;
-
-    /**
-     * (Optional) The lowWaterMark details of the transaction buffer.
-     */
-    public Map<Long, Long> lowWaterMarks;
-    /**
-     * The total number of ongoing transactions in this transaction buffer.
-     */
-    public long ongoingTxnSize;
-
-    //Start timestamp of  transaction buffer recovery. 0L means no startup.
-    public long recoverStartTime;
-    //End timestamp of transaction buffer recovery. 0L means no startup.
-    public long recoverEndTime;
+    public SegmentStats() {
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SegmentsStats.java
similarity index 51%
copy from 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
copy to 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SegmentsStats.java
index 73d66b8c230..46422c0b67b 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SegmentsStats.java
@@ -18,30 +18,20 @@
  */
 package org.apache.pulsar.common.policies.data;
 
-import java.util.Map;
+import java.util.List;
 
-public class TransactionBufferStats {
+public class SegmentsStats {
+    // The current number of the snapshot segments.
+    public long segmentsSize;
 
-    /** The state of this transaction buffer. */
-    public String state;
+    // The capacity of snapshot segment calculated by the current config 
(transactionBufferSnapshotSegmentSize)
+    public long currentSegmentCapacity;
 
-    /** The max read position of this transaction buffer. */
-    public String maxReadPosition;
+    // The latest aborted txn IDs which number less than currentSegmentCapacity
+    public long unsealedAbortTxnIDSize;
 
-    /** The last snapshot timestamps of this transaction buffer. */
-    public long lastSnapshotTimestamps;
-
-    /**
-     * (Optional) The lowWaterMark details of the transaction buffer.
-     */
-    public Map<Long, Long> lowWaterMarks;
-    /**
-     * The total number of ongoing transactions in this transaction buffer.
-     */
-    public long ongoingTxnSize;
-
-    //Start timestamp of  transaction buffer recovery. 0L means no startup.
-    public long recoverStartTime;
-    //End timestamp of transaction buffer recovery. 0L means no startup.
-    public long recoverEndTime;
+    // A list of individual segment stats
+    public List<SegmentStats> segmentStats;
+    /** The last snapshot segment timestamps of this transaction buffer. */
+    public long lastTookSnapshotSegmentTimestamp;
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SnapshotSystemTopicInternalStats.java
similarity index 51%
copy from 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
copy to 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SnapshotSystemTopicInternalStats.java
index 73d66b8c230..7ce95375e72 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SnapshotSystemTopicInternalStats.java
@@ -18,30 +18,10 @@
  */
 package org.apache.pulsar.common.policies.data;
 
-import java.util.Map;
+public class SnapshotSystemTopicInternalStats {
+    // The managed ledger name for the snapshot segment topic or index topic.
+    public String managedLedgerName;
 
-public class TransactionBufferStats {
-
-    /** The state of this transaction buffer. */
-    public String state;
-
-    /** The max read position of this transaction buffer. */
-    public String maxReadPosition;
-
-    /** The last snapshot timestamps of this transaction buffer. */
-    public long lastSnapshotTimestamps;
-
-    /**
-     * (Optional) The lowWaterMark details of the transaction buffer.
-     */
-    public Map<Long, Long> lowWaterMarks;
-    /**
-     * The total number of ongoing transactions in this transaction buffer.
-     */
-    public long ongoingTxnSize;
-
-    //Start timestamp of  transaction buffer recovery. 0L means no startup.
-    public long recoverStartTime;
-    //End timestamp of transaction buffer recovery. 0L means no startup.
-    public long recoverEndTime;
+    // The managed ledger internal stats for the snapshot segment topic or 
index topic.
+    public ManagedLedgerInternalStats managedLedgerInternalStats;
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferInternalStats.java
similarity index 51%
copy from 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
copy to 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferInternalStats.java
index 73d66b8c230..b4c9e096d0a 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferInternalStats.java
@@ -18,30 +18,16 @@
  */
 package org.apache.pulsar.common.policies.data;
 
-import java.util.Map;
+public class TransactionBufferInternalStats {
+    // The type of snapshot being used: either "Single" or "Segment"
+    public String snapshotType;
 
-public class TransactionBufferStats {
+    // If snapshotType is "Single", this field will provide the statistics of 
single snapshot log.
+    public SnapshotSystemTopicInternalStats 
singleSnapshotSystemTopicInternalStats;
 
-    /** The state of this transaction buffer. */
-    public String state;
+    // If snapshotType is "Segment", this field will provide the statistics of 
snapshot segment topic.
+    public SnapshotSystemTopicInternalStats segmentInternalStats;
 
-    /** The max read position of this transaction buffer. */
-    public String maxReadPosition;
-
-    /** The last snapshot timestamps of this transaction buffer. */
-    public long lastSnapshotTimestamps;
-
-    /**
-     * (Optional) The lowWaterMark details of the transaction buffer.
-     */
-    public Map<Long, Long> lowWaterMarks;
-    /**
-     * The total number of ongoing transactions in this transaction buffer.
-     */
-    public long ongoingTxnSize;
-
-    //Start timestamp of  transaction buffer recovery. 0L means no startup.
-    public long recoverStartTime;
-    //End timestamp of transaction buffer recovery. 0L means no startup.
-    public long recoverEndTime;
+    // If snapshotType is "Segment", this field will provide the statistics of 
snapshot segment index topic.
+    public SnapshotSystemTopicInternalStats segmentIndexInternalStats;
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
index 73d66b8c230..1dffa0dd614 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
@@ -44,4 +44,13 @@ public class TransactionBufferStats {
     public long recoverStartTime;
     //End timestamp of transaction buffer recovery. 0L means no startup.
     public long recoverEndTime;
+
+    // The total number of aborted transactions.
+    public long totalAbortedTransactions;
+
+    // The type of snapshot being used: either "Single" or "Segment"
+    public String snapshotType;
+
+    // If snapshotType is "Segment", this field will provide additional 
segment-related statistics
+    public SegmentsStats segmentsStats;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 5693ebc8f60..2d1dd408ef6 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.Transactions;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
 import 
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
@@ -132,17 +133,20 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
 
     @Override
     public CompletableFuture<TransactionBufferStats> 
getTransactionBufferStatsAsync(String topic,
-                                                                               
     boolean lowWaterMarks) {
+                                                                               
     boolean lowWaterMarks,
+                                                                               
     boolean segmentStats) {
         WebTarget path = adminV3Transactions.path("transactionBufferStats");
         path = path.path(TopicName.get(topic).getRestPath(false));
         path = path.queryParam("lowWaterMarks", lowWaterMarks);
+        path = path.queryParam("segmentStats", segmentStats);
         return asyncGetRequest(path, new 
FutureCallback<TransactionBufferStats>(){});
     }
 
     @Override
     public TransactionBufferStats getTransactionBufferStats(String topic,
-                                                            boolean 
lowWaterMarks) throws PulsarAdminException {
-        return sync(() -> getTransactionBufferStatsAsync(topic, 
lowWaterMarks));
+                                                            boolean 
lowWaterMarks,
+                                                            boolean 
segmentStats) throws PulsarAdminException {
+        return sync(() -> getTransactionBufferStatsAsync(topic, lowWaterMarks, 
segmentStats));
     }
 
     @Override
@@ -227,6 +231,22 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
         return sync(() -> getPendingAckInternalStatsAsync(topic, subName, 
metadata));
     }
 
+    @Override
+    public CompletableFuture<TransactionBufferInternalStats> 
getTransactionBufferInternalStatsAsync(String topic,
+                                                                               
                     boolean metadata) {
+        TopicName tn = TopicName.get(topic);
+        WebTarget path = 
adminV3Transactions.path("transactionBufferInternalStats");
+        path = path.path(tn.getRestPath(false));
+        path = path.queryParam("metadata", metadata);
+        return asyncGetRequest(path, new 
FutureCallback<TransactionBufferInternalStats>(){});
+    }
+
+    @Override
+    public TransactionBufferInternalStats 
getTransactionBufferInternalStats(String topic, boolean metadata)
+            throws PulsarAdminException {
+        return sync(() -> getTransactionBufferInternalStatsAsync(topic, 
metadata));
+    }
+
     @Override
     public void scaleTransactionCoordinators(int replicas) throws 
PulsarAdminException {
          sync(() -> scaleTransactionCoordinatorsAsync(replicas));
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 8509d037cba..a722abe19df 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -2377,7 +2377,7 @@ public class PulsarAdminToolTest {
 
         cmdTransactions = new CmdTransactions(() -> admin);
         cmdTransactions.run(split("transaction-buffer-stats -t test -l"));
-        verify(transactions).getTransactionBufferStats("test", true);
+        verify(transactions).getTransactionBufferStats("test", true, false);
 
         cmdTransactions = new CmdTransactions(() -> admin);
         cmdTransactions.run(split("pending-ack-stats -t test -s test -l"));
@@ -2387,6 +2387,10 @@ public class PulsarAdminToolTest {
         cmdTransactions.run(split("pending-ack-internal-stats -t test -s 
test"));
         verify(transactions).getPendingAckInternalStats("test", "test", false);
 
+        cmdTransactions = new CmdTransactions(() -> admin);
+        cmdTransactions.run(split("buffer-snapshot-internal-stats -t test"));
+        verify(transactions).getTransactionBufferInternalStats("test", false);
+
         cmdTransactions = new CmdTransactions(() -> admin);
         cmdTransactions.run(split("scale-transactionCoordinators -r 3"));
         verify(transactions).scaleTransactionCoordinators(3);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
index 08ffba1451f..b999e30b108 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
@@ -56,9 +56,14 @@ public class CmdTransactions extends CmdBase {
                 description = "Whether to get information about lowWaterMarks 
stored in transaction buffer.")
         private boolean lowWaterMark;
 
+        @Parameter(names = {"-s", "--segment-stats"},
+                description = "Whether to get segment statistics.")
+        private boolean segmentStats = false;
+
         @Override
         void run() throws Exception {
-            print(getAdmin().transactions().getTransactionBufferStats(topic, 
lowWaterMark));
+            // Assuming getTransactionBufferStats method signature has been 
updated to accept the new parameter
+            print(getAdmin().transactions().getTransactionBufferStats(topic, 
lowWaterMark, segmentStats));
         }
     }
 
@@ -188,6 +193,20 @@ public class CmdTransactions extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get transaction buffer internal stats")
+    private class GetTransactionBufferInternalStats extends CliCommand {
+        @Parameter(names = {"-t", "--topic"}, description = "Topic name", 
required = true)
+        private String topic;
+
+        @Parameter(names = { "-m", "--metadata" }, description = "Flag to 
include ledger metadata")
+        private boolean metadata = false;
+
+        @Override
+        void run() throws Exception {
+            
print(getAdmin().transactions().getTransactionBufferInternalStats(topic, 
metadata));
+        }
+    }
+
     @Parameters(commandDescription = "Update the scale of transaction 
coordinators")
     private class ScaleTransactionCoordinators extends CliCommand {
         @Parameter(names = { "-r", "--replicas" }, description = "The scale of 
the transaction coordinators")
@@ -242,6 +261,7 @@ public class CmdTransactions extends CmdBase {
         super("transactions", admin);
         jcommander.addCommand("coordinator-internal-stats", new 
GetCoordinatorInternalStats());
         jcommander.addCommand("pending-ack-internal-stats", new 
GetPendingAckInternalStats());
+        jcommander.addCommand("buffer-snapshot-internal-stats", new 
GetTransactionBufferInternalStats());
         jcommander.addCommand("coordinator-stats", new GetCoordinatorStats());
         jcommander.addCommand("transaction-buffer-stats", new 
GetTransactionBufferStats());
         jcommander.addCommand("pending-ack-stats", new GetPendingAckStats());

Reply via email to