gaoran10 commented on code in PR #20330:
URL: https://github.com/apache/pulsar/pull/20330#discussion_r1262322512
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -431,6 +437,70 @@ protected
CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi
);
}
+ protected CompletableFuture<TransactionBufferInternalStats>
internalGetTransactionBufferInternalStats(
+ boolean authoritative, boolean metadata) {
+ TransactionBufferInternalStats transactionBufferInternalStats = new
TransactionBufferInternalStats();
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenCompose(topic -> {
+ TransactionBuffer.SnapshotType snapshotType =
topic.getTransactionBuffer().getSnapshotType();
+ if (snapshotType == null) {
+ return FutureUtil.failedFuture(new
RestException(NOT_FOUND,
+ "Transaction buffer Snapshot for the topic
does not exist"));
+ } else if (snapshotType ==
TransactionBuffer.SnapshotType.Segment) {
+ transactionBufferInternalStats.snapshotType =
snapshotType.toString();
+ TopicName segmentTopic =
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+ return getTxnSnapshotInternalStats(segmentTopic,
metadata)
+ .thenApply(snapshotSystemTopicInternalStats ->
{
Review Comment:
Maybe we can use the method `thenAccept` here.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java:
##########
@@ -66,9 +67,10 @@ public interface AbortedTxnProcessor {
/**
* Get the lastSnapshotTimestamps.
- * @return the lastSnapshotTimestamps.
+ *
+ * @return a transactionBufferStats with the stats in the
abortedTxnProcessor.
*/
- long getLastSnapshotTimestamps();
+ TransactionBufferStats generateSnapshotStats(boolean segmentStats);
Review Comment:
Do we need to add a new method for compatibility?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3547,8 +3547,8 @@ public boolean checkSubscriptionTypesEnable(SubType
subType) {
return subTypesEnabled != null && subTypesEnabled.contains(subType);
}
- public TransactionBufferStats getTransactionBufferStats(boolean
lowWaterMarks) {
- return this.transactionBuffer.getStats(lowWaterMarks);
+ public TransactionBufferStats getTransactionBufferStats(boolean
lowWaterMarks, boolean segmentStats) {
Review Comment:
Do we need to add a new method for compatibility? /cc @BewareMyPower
@codelipenghui
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -431,6 +437,70 @@ protected
CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi
);
}
+ protected CompletableFuture<TransactionBufferInternalStats>
internalGetTransactionBufferInternalStats(
+ boolean authoritative, boolean metadata) {
+ TransactionBufferInternalStats transactionBufferInternalStats = new
TransactionBufferInternalStats();
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenCompose(topic -> {
+ TransactionBuffer.SnapshotType snapshotType =
topic.getTransactionBuffer().getSnapshotType();
+ if (snapshotType == null) {
+ return FutureUtil.failedFuture(new
RestException(NOT_FOUND,
+ "Transaction buffer Snapshot for the topic
does not exist"));
+ } else if (snapshotType ==
TransactionBuffer.SnapshotType.Segment) {
+ transactionBufferInternalStats.snapshotType =
snapshotType.toString();
+ TopicName segmentTopic =
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+ return getTxnSnapshotInternalStats(segmentTopic,
metadata)
+ .thenApply(snapshotSystemTopicInternalStats ->
{
+
transactionBufferInternalStats.segmentInternalStats =
+ snapshotSystemTopicInternalStats;
+ return transactionBufferInternalStats;
+ }).thenCompose(ignore -> {
Review Comment:
Maybe we can use the `thenCombine`, such as this.
```
var segmentStatsFuture = ...;
var indexStatsFuture = ...;
segmentStatsFuture.thenCombine(indexStatsFuture, (segmentStats, indexStats)
-> {
transactionBufferInternalStats.segmentInternalStats = segmentStats;
transactionBufferInternalStats.segmentIndexInternalStats = indexStats;
return transactionBufferInternalStats;
})
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -431,6 +437,70 @@ protected
CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi
);
}
+ protected CompletableFuture<TransactionBufferInternalStats>
internalGetTransactionBufferInternalStats(
+ boolean authoritative, boolean metadata) {
+ TransactionBufferInternalStats transactionBufferInternalStats = new
TransactionBufferInternalStats();
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenCompose(topic -> {
+ TransactionBuffer.SnapshotType snapshotType =
topic.getTransactionBuffer().getSnapshotType();
+ if (snapshotType == null) {
+ return FutureUtil.failedFuture(new
RestException(NOT_FOUND,
+ "Transaction buffer Snapshot for the topic
does not exist"));
+ } else if (snapshotType ==
TransactionBuffer.SnapshotType.Segment) {
+ transactionBufferInternalStats.snapshotType =
snapshotType.toString();
+ TopicName segmentTopic =
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+ return getTxnSnapshotInternalStats(segmentTopic,
metadata)
+ .thenApply(snapshotSystemTopicInternalStats ->
{
+
transactionBufferInternalStats.segmentInternalStats =
+ snapshotSystemTopicInternalStats;
+ return transactionBufferInternalStats;
+ }).thenCompose(ignore -> {
+ TopicName indexTopic =
TopicName.get(TopicDomain.persistent.toString(),
+ namespaceName,
+
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+ return
getTxnSnapshotInternalStats(indexTopic, metadata)
+ .thenApply(indexStats -> {
+
transactionBufferInternalStats.segmentIndexInternalStats = indexStats;
+ return
transactionBufferInternalStats;
+ });
+ });
+ } else if (snapshotType ==
TransactionBuffer.SnapshotType.Single) {
+ transactionBufferInternalStats.snapshotType =
snapshotType.toString();
+ TopicName singleSnapshotTopic =
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+ return
getTxnSnapshotInternalStats(singleSnapshotTopic, metadata)
+ .thenApply(snapshotSystemTopicInternalStats ->
{
+
transactionBufferInternalStats.singleSnapshotSystemTopicInternalStats =
+ snapshotSystemTopicInternalStats;
+ return transactionBufferInternalStats;
+ });
+ }
+ return FutureUtil.failedFuture(new
RestException(INTERNAL_SERVER_ERROR, "Unknown SnapshotType "
+ + snapshotType));
+ });
+ }
+
+ private CompletableFuture<SnapshotSystemTopicInternalStats>
getTxnSnapshotInternalStats(TopicName topicName,
+
boolean metadata) {
+ final PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (PulsarServerException e) {
+ return FutureUtil.failedFuture(new RestException(e));
+ }
+ NamespaceService ns = pulsar().getNamespaceService();
+ return ns.isServiceUnitOwnedAsync(topicName)
Review Comment:
Why need to check the ownership here? The result `isOwner` is not used, and
the method `admin.topics().getInternalStatsAsync` can redirect requests by
itself.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java:
##########
@@ -168,7 +184,7 @@ public interface TransactionBuffer {
* Get transaction stats in buffer.
* @return the transaction stats in buffer.
*/
- TransactionBufferStats getStats(boolean lowWaterMarks);
+ TransactionBufferStats getStats(boolean lowWaterMarks, boolean
segmentStats);
Review Comment:
Do we need to add a new method for compatibility?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]