eolivelli commented on code in PR #15675:
URL: https://github.com/apache/pulsar/pull/15675#discussion_r881321048
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -543,11 +547,33 @@ public TransactionInBufferStats
getTransactionInBufferStats(TxnID txnID) {
}
@Override
- public TransactionBufferStats getStats() {
+ public TransactionBufferStats getStats(boolean lowWaterMarks) {
TransactionBufferStats transactionBufferStats = new
TransactionBufferStats();
transactionBufferStats.lastSnapshotTimestamps =
this.lastSnapshotTimestamps;
transactionBufferStats.state = this.getState().name();
transactionBufferStats.maxReadPosition =
this.maxReadPosition.toString();
+ if (lowWaterMarks) {
+ transactionBufferStats.lowWaterMarks = this.lowWaterMarks;
+ }
+ transactionBufferStats.ongoingTxns = ongoingTxns.size();
+
+ Optional<LookupResult> lookupResultOptional = Optional.empty();
+ try {
+ lookupResultOptional = this.topic
+ .getBrokerService()
+ .getPulsar()
+ .getNamespaceService()
+ .getBrokerServiceUrlAsync(TopicName.get(topic.getName()),
+
LookupOptions.builder().loadTopicsInBundle(false).build())
+ .get();
+ } catch (Exception e) {
+ // Just log the exception, nothing else to do
+ log.warn("getBrokerServiceUrl [{}]", e.getMessage(), e);
+ }
+ if (lookupResultOptional.isPresent()) {
+ LookupData lookupData = lookupResultOptional.get().getLookupData();
+ transactionBufferStats.brokerOwnerURL = lookupData.getBrokerUrl();
Review Comment:
I think that the client can do this lookup locally.
using Broker resources to do lookups may be expensive
this method is not supposed to be called frequently but if there is no
strict need I would prefer to not do the lookup inside this method
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -858,9 +865,30 @@ public CompletableFuture<PendingAckHandle>
pendingAckHandleFuture() {
}
@Override
- public TransactionPendingAckStats getStats() {
+ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
TransactionPendingAckStats transactionPendingAckStats = new
TransactionPendingAckStats();
transactionPendingAckStats.state = this.getState().name();
+ if (lowWaterMarks) {
+ transactionPendingAckStats.lowWaterMarks = this.lowWaterMarks;
+ }
+ if (individualAckOfTransaction != null) {
+ transactionPendingAckStats.ongoingTxns =
individualAckOfTransaction.size();
+ } else {
+ transactionPendingAckStats.ongoingTxns = 0;
+ }
+ Optional<LookupResult> lookupResultOptional =
this.persistentSubscription
Review Comment:
the same here
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -543,11 +547,33 @@ public TransactionInBufferStats
getTransactionInBufferStats(TxnID txnID) {
}
@Override
- public TransactionBufferStats getStats() {
+ public TransactionBufferStats getStats(boolean lowWaterMarks) {
TransactionBufferStats transactionBufferStats = new
TransactionBufferStats();
transactionBufferStats.lastSnapshotTimestamps =
this.lastSnapshotTimestamps;
transactionBufferStats.state = this.getState().name();
transactionBufferStats.maxReadPosition =
this.maxReadPosition.toString();
+ if (lowWaterMarks) {
+ transactionBufferStats.lowWaterMarks = this.lowWaterMarks;
+ }
+ transactionBufferStats.ongoingTxns = ongoingTxns.size();
+
+ Optional<LookupResult> lookupResultOptional = Optional.empty();
+ try {
+ lookupResultOptional = this.topic
+ .getBrokerService()
+ .getPulsar()
+ .getNamespaceService()
+ .getBrokerServiceUrlAsync(TopicName.get(topic.getName()),
+
LookupOptions.builder().loadTopicsInBundle(false).build())
+ .get();
Review Comment:
please add a timeout here
##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java:
##########
@@ -121,35 +121,41 @@ TransactionInPendingAckStats
getTransactionInPendingAckStats(TxnID txnID, String
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
+ * @param lowWaterMarks Whether to get information about lowWaterMarks
stored in transaction pending ack.
* @return the future stats of transaction buffer in topic.
*/
- CompletableFuture<TransactionBufferStats>
getTransactionBufferStatsAsync(String topic);
+ CompletableFuture<TransactionBufferStats>
getTransactionBufferStatsAsync(String topic, boolean lowWaterMarks);
Review Comment:
please keep the old method, you can add a "default" implementation that
calls the new method, otherwise this is a breaking API change
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]