gaoran10 commented on code in PR #20330:
URL: https://github.com/apache/pulsar/pull/20330#discussion_r1262322512


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -431,6 +437,70 @@ protected 
CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi
                 );
     }
 
+    protected CompletableFuture<TransactionBufferInternalStats> 
internalGetTransactionBufferInternalStats(
+            boolean authoritative, boolean metadata) {
+        TransactionBufferInternalStats transactionBufferInternalStats = new 
TransactionBufferInternalStats();
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenCompose(topic -> {
+                    TransactionBuffer.SnapshotType snapshotType = 
topic.getTransactionBuffer().getSnapshotType();
+                    if (snapshotType == null) {
+                        return FutureUtil.failedFuture(new 
RestException(NOT_FOUND,
+                                "Transaction buffer Snapshot for the topic 
does not exist"));
+                    } else if (snapshotType == 
TransactionBuffer.SnapshotType.Segment) {
+                        transactionBufferInternalStats.snapshotType = 
snapshotType.toString();
+                        TopicName segmentTopic = 
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+                                
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+                        return getTxnSnapshotInternalStats(segmentTopic, 
metadata)
+                                .thenApply(snapshotSystemTopicInternalStats -> 
{

Review Comment:
   Maybe we can use the method `thenAccept` here.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java:
##########
@@ -66,9 +67,10 @@ public interface AbortedTxnProcessor {
 
     /**
      * Get the lastSnapshotTimestamps.
-     * @return the lastSnapshotTimestamps.
+     *
+     * @return a transactionBufferStats with the stats in the 
abortedTxnProcessor.
      */
-    long getLastSnapshotTimestamps();
+    TransactionBufferStats generateSnapshotStats(boolean segmentStats);

Review Comment:
   Do we need to add a new method for compatibility?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3547,8 +3547,8 @@ public boolean checkSubscriptionTypesEnable(SubType 
subType) {
         return subTypesEnabled != null && subTypesEnabled.contains(subType);
     }
 
-    public TransactionBufferStats getTransactionBufferStats(boolean 
lowWaterMarks) {
-        return this.transactionBuffer.getStats(lowWaterMarks);
+    public TransactionBufferStats getTransactionBufferStats(boolean 
lowWaterMarks, boolean segmentStats) {

Review Comment:
   Do we need to add a new method for compatibility? /cc @BewareMyPower 
@codelipenghui 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -431,6 +437,70 @@ protected 
CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi
                 );
     }
 
+    protected CompletableFuture<TransactionBufferInternalStats> 
internalGetTransactionBufferInternalStats(
+            boolean authoritative, boolean metadata) {
+        TransactionBufferInternalStats transactionBufferInternalStats = new 
TransactionBufferInternalStats();
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenCompose(topic -> {
+                    TransactionBuffer.SnapshotType snapshotType = 
topic.getTransactionBuffer().getSnapshotType();
+                    if (snapshotType == null) {
+                        return FutureUtil.failedFuture(new 
RestException(NOT_FOUND,
+                                "Transaction buffer Snapshot for the topic 
does not exist"));
+                    } else if (snapshotType == 
TransactionBuffer.SnapshotType.Segment) {
+                        transactionBufferInternalStats.snapshotType = 
snapshotType.toString();
+                        TopicName segmentTopic = 
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+                                
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+                        return getTxnSnapshotInternalStats(segmentTopic, 
metadata)
+                                .thenApply(snapshotSystemTopicInternalStats -> 
{
+                                    
transactionBufferInternalStats.segmentInternalStats =
+                                            snapshotSystemTopicInternalStats;
+                                    return transactionBufferInternalStats;
+                                }).thenCompose(ignore -> {

Review Comment:
   Maybe we can use the `thenCombine`, such as this.
   
   ```
   var segmentStatsFuture = ...;
   var indexStatsFuture = ...;
   segmentStatsFuture.thenCombine(indexStatsFuture, (segmentStats, indexStats) 
-> {
     transactionBufferInternalStats.segmentInternalStats = segmentStats;
     transactionBufferInternalStats.segmentIndexInternalStats = indexStats;
     return transactionBufferInternalStats;
   })
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -431,6 +437,70 @@ protected 
CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi
                 );
     }
 
+    protected CompletableFuture<TransactionBufferInternalStats> 
internalGetTransactionBufferInternalStats(
+            boolean authoritative, boolean metadata) {
+        TransactionBufferInternalStats transactionBufferInternalStats = new 
TransactionBufferInternalStats();
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenCompose(topic -> {
+                    TransactionBuffer.SnapshotType snapshotType = 
topic.getTransactionBuffer().getSnapshotType();
+                    if (snapshotType == null) {
+                        return FutureUtil.failedFuture(new 
RestException(NOT_FOUND,
+                                "Transaction buffer Snapshot for the topic 
does not exist"));
+                    } else if (snapshotType == 
TransactionBuffer.SnapshotType.Segment) {
+                        transactionBufferInternalStats.snapshotType = 
snapshotType.toString();
+                        TopicName segmentTopic = 
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+                                
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+                        return getTxnSnapshotInternalStats(segmentTopic, 
metadata)
+                                .thenApply(snapshotSystemTopicInternalStats -> 
{
+                                    
transactionBufferInternalStats.segmentInternalStats =
+                                            snapshotSystemTopicInternalStats;
+                                    return transactionBufferInternalStats;
+                                }).thenCompose(ignore -> {
+                                    TopicName indexTopic = 
TopicName.get(TopicDomain.persistent.toString(),
+                                            namespaceName,
+                                            
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+                                    return 
getTxnSnapshotInternalStats(indexTopic, metadata)
+                                            .thenApply(indexStats -> {
+                                                
transactionBufferInternalStats.segmentIndexInternalStats = indexStats;
+                                                return 
transactionBufferInternalStats;
+                                            });
+                                });
+                    } else if (snapshotType == 
TransactionBuffer.SnapshotType.Single) {
+                        transactionBufferInternalStats.snapshotType = 
snapshotType.toString();
+                        TopicName singleSnapshotTopic = 
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+                                SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+                        return 
getTxnSnapshotInternalStats(singleSnapshotTopic, metadata)
+                                .thenApply(snapshotSystemTopicInternalStats -> 
{
+                                   
transactionBufferInternalStats.singleSnapshotSystemTopicInternalStats =
+                                           snapshotSystemTopicInternalStats;
+                                   return transactionBufferInternalStats;
+                                });
+                    }
+                    return FutureUtil.failedFuture(new 
RestException(INTERNAL_SERVER_ERROR, "Unknown SnapshotType "
+                            + snapshotType));
+                });
+    }
+
+    private CompletableFuture<SnapshotSystemTopicInternalStats> 
getTxnSnapshotInternalStats(TopicName topicName,
+                                                                               
             boolean metadata) {
+        final PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException e) {
+            return FutureUtil.failedFuture(new RestException(e));
+        }
+        NamespaceService ns = pulsar().getNamespaceService();
+        return ns.isServiceUnitOwnedAsync(topicName)

Review Comment:
   Why need to check the ownership here? The result `isOwner` is not used, and 
the method `admin.topics().getInternalStatsAsync` can redirect requests by 
itself.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java:
##########
@@ -168,7 +184,7 @@ public interface TransactionBuffer {
      * Get transaction stats in buffer.
      * @return the transaction stats in buffer.
      */
-    TransactionBufferStats getStats(boolean lowWaterMarks);
+    TransactionBufferStats getStats(boolean lowWaterMarks, boolean 
segmentStats);

Review Comment:
   Do we need to add a new method for compatibility?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to