This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 4e535cb3f4a [feat][broker] PIP-264: Add transaction metrics (#22970) 4e535cb3f4a is described below commit 4e535cb3f4a3482b0d5dc5a3a0a63c87490704e3 Author: Dragos Misca <dragosvic...@users.noreply.github.com> AuthorDate: Thu Jun 27 02:54:43 2024 -0700 [feat][broker] PIP-264: Add transaction metrics (#22970) --- .../org/apache/pulsar/broker/PulsarService.java | 15 ++++ .../broker/service/PersistentTopicAttributes.java | 30 ++++++++ .../service/persistent/PersistentSubscription.java | 7 +- .../service/persistent/PersistentTopicMetrics.java | 14 +++- .../broker/stats/OpenTelemetryTopicStats.java | 27 ++++++- .../OpenTelemetryTransactionCoordinatorStats.java | 87 ++++++++++++++++++++++ ...enTelemetryTransactionPendingAckStoreStats.java | 72 ++++++++++++++++++ .../buffer/TransactionBufferClientStats.java | 7 +- .../buffer/impl/TransactionBufferClientImpl.java | 9 ++- .../impl/TransactionBufferClientStatsImpl.java | 61 +++++++++++++-- .../transaction/pendingack/PendingAckHandle.java | 7 ++ .../pendingack/PendingAckHandleAttributes.java | 63 ++++++++++++++++ .../pendingack/PendingAckHandleStats.java | 7 ++ .../pendingack/impl/PendingAckHandleDisabled.java | 6 ++ .../pendingack/impl/PendingAckHandleImpl.java | 28 ++++--- .../pendingack/impl/PendingAckHandleStatsImpl.java | 56 +++++++++++++- .../pulsar/broker/transaction/TransactionTest.java | 24 +++++- .../buffer/TopicTransactionBufferTest.java | 22 +++++- .../pendingack/PendingAckPersistentTest.java | 40 ++++++++++ .../opentelemetry/OpenTelemetryAttributes.java | 33 +++++++- pulsar-transaction/coordinator/pom.xml | 6 ++ .../coordinator/TransactionMetadataStore.java | 9 +++ .../TransactionMetadataStoreAttributes.java | 56 ++++++-------- .../impl/InMemTransactionMetadataStore.java | 16 ++++ .../impl/MLTransactionMetadataStore.java | 16 ++++ 25 files changed, 640 insertions(+), 78 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 0d8bc571c57..848484fe376 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -116,6 +116,8 @@ import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats; import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats; import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats; import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; +import org.apache.pulsar.broker.stats.OpenTelemetryTransactionCoordinatorStats; +import org.apache.pulsar.broker.stats.OpenTelemetryTransactionPendingAckStoreStats; import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; @@ -263,6 +265,8 @@ public class PulsarService implements AutoCloseable, ShutdownService { private OpenTelemetryConsumerStats openTelemetryConsumerStats; private OpenTelemetryProducerStats openTelemetryProducerStats; private OpenTelemetryReplicatorStats openTelemetryReplicatorStats; + private OpenTelemetryTransactionCoordinatorStats openTelemetryTransactionCoordinatorStats; + private OpenTelemetryTransactionPendingAckStoreStats openTelemetryTransactionPendingAckStoreStats; private TransactionMetadataStoreService transactionMetadataStoreService; private TransactionBufferProvider transactionBufferProvider; @@ -684,6 +688,14 @@ public class PulsarService implements AutoCloseable, ShutdownService { brokerClientSharedTimer.stop(); monotonicSnapshotClock.close(); + if (openTelemetryTransactionPendingAckStoreStats != null) { + openTelemetryTransactionPendingAckStoreStats.close(); + openTelemetryTransactionPendingAckStoreStats = null; + } + if (openTelemetryTransactionCoordinatorStats != null) { + openTelemetryTransactionCoordinatorStats.close(); + openTelemetryTransactionCoordinatorStats = null; + } if (openTelemetryReplicatorStats != null) { openTelemetryReplicatorStats.close(); openTelemetryReplicatorStats = null; @@ -996,6 +1008,9 @@ public class PulsarService implements AutoCloseable, ShutdownService { .newProvider(config.getTransactionBufferProviderClassName()); transactionPendingAckStoreProvider = TransactionPendingAckStoreProvider .newProvider(config.getTransactionPendingAckStoreProviderClassName()); + + openTelemetryTransactionCoordinatorStats = new OpenTelemetryTransactionCoordinatorStats(this); + openTelemetryTransactionPendingAckStoreStats = new OpenTelemetryTransactionPendingAckStoreStats(this); } this.metricsGenerator = new MetricsGenerator(this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java index 048edafe884..51f5bdb354d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java @@ -36,6 +36,11 @@ public class PersistentTopicAttributes extends TopicAttributes { private final Attributes transactionCommittedAttributes; private final Attributes transactionAbortedAttributes; + private final Attributes transactionBufferClientCommitSucceededAttributes; + private final Attributes transactionBufferClientCommitFailedAttributes; + private final Attributes transactionBufferClientAbortSucceededAttributes; + private final Attributes transactionBufferClientAbortFailedAttributes; + public PersistentTopicAttributes(TopicName topicName) { super(topicName); @@ -61,6 +66,31 @@ public class PersistentTopicAttributes extends TopicAttributes { .putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes) .build(); + transactionBufferClientCommitSucceededAttributes = Attributes.builder() + .putAll(commonAttributes) + .remove(OpenTelemetryAttributes.PULSAR_DOMAIN) + .putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes) + .putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.SUCCESS.attributes) + .build(); + transactionBufferClientCommitFailedAttributes = Attributes.builder() + .putAll(commonAttributes) + .remove(OpenTelemetryAttributes.PULSAR_DOMAIN) + .putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes) + .putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.FAILURE.attributes) + .build(); + transactionBufferClientAbortSucceededAttributes = Attributes.builder() + .putAll(commonAttributes) + .remove(OpenTelemetryAttributes.PULSAR_DOMAIN) + .putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes) + .putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.SUCCESS.attributes) + .build(); + transactionBufferClientAbortFailedAttributes = Attributes.builder() + .putAll(commonAttributes) + .remove(OpenTelemetryAttributes.PULSAR_DOMAIN) + .putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes) + .putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.FAILURE.attributes) + .build(); + compactionSuccessAttributes = Attributes.builder() .putAll(commonAttributes) .putAll(OpenTelemetryAttributes.CompactionStatus.SUCCESS.attributes) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 7da339a420c..a1d51668ca8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; +import lombok.Getter; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; @@ -128,6 +129,7 @@ public class PersistentSubscription extends AbstractSubscription { private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Map.of(); private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; + @Getter private final PendingAckHandle pendingAckHandle; private volatile Map<String, String> subscriptionProperties; private volatile CompletableFuture<Void> fenceFuture; @@ -1439,11 +1441,6 @@ public class PersistentSubscription extends AbstractSubscription { return cursor; } - @VisibleForTesting - public PendingAckHandle getPendingAckHandle() { - return pendingAckHandle; - } - public void syncBatchPositionBitSetForPendingAck(Position position) { this.pendingAckHandle.syncBatchPositionAckSetForTransaction(position); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java index f79d053a979..d8ebece7a51 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java @@ -21,12 +21,13 @@ package org.apache.pulsar.broker.service.persistent; import java.util.concurrent.atomic.LongAdder; import lombok.Getter; -@SuppressWarnings("LombokGetterMayBeUsed") +@Getter public class PersistentTopicMetrics { - @Getter private final BacklogQuotaMetrics backlogQuotaMetrics = new BacklogQuotaMetrics(); + private final TransactionBufferClientMetrics transactionBufferClientMetrics = new TransactionBufferClientMetrics(); + public static class BacklogQuotaMetrics { private final LongAdder timeBasedBacklogQuotaExceededEvictionCount = new LongAdder(); private final LongAdder sizeBasedBacklogQuotaExceededEvictionCount = new LongAdder(); @@ -47,4 +48,13 @@ public class PersistentTopicMetrics { return timeBasedBacklogQuotaExceededEvictionCount.longValue(); } } + + @Getter + public static class TransactionBufferClientMetrics { + private final LongAdder commitSucceededCount = new LongAdder(); + private final LongAdder commitFailedCount = new LongAdder(); + + private final LongAdder abortSucceededCount = new LongAdder(); + private final LongAdder abortFailedCount = new LongAdder(); + } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java index b6d3f089077..0274cb7a7d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java @@ -149,6 +149,12 @@ public class OpenTelemetryTopicStats implements AutoCloseable { public static final String TRANSACTION_COUNTER = "pulsar.broker.topic.transaction.count"; private final ObservableLongMeasurement transactionCounter; + // Replaces ['pulsar_txn_tb_client_abort_failed_total', 'pulsar_txn_tb_client_commit_failed_total', + // 'pulsar_txn_tb_client_abort_latency', 'pulsar_txn_tb_client_commit_latency'] + public static final String TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER = + "pulsar.broker.topic.transaction.buffer.client.operation.count"; + private final ObservableLongMeasurement transactionBufferClientOperationCounter; + // Replaces pulsar_subscription_delayed public static final String DELAYED_SUBSCRIPTION_COUNTER = "pulsar.broker.topic.subscription.delayed.entry.count"; private final ObservableLongMeasurement delayedSubscriptionCounter; @@ -333,6 +339,12 @@ public class OpenTelemetryTopicStats implements AutoCloseable { .setDescription("The number of transactions on this topic.") .buildObserver(); + transactionBufferClientOperationCounter = meter + .counterBuilder(TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER) + .setUnit("{operation}") + .setDescription("The number of operations on the transaction buffer client.") + .buildObserver(); + delayedSubscriptionCounter = meter .upDownCounterBuilder(DELAYED_SUBSCRIPTION_COUNTER) .setUnit("{entry}") @@ -371,6 +383,7 @@ public class OpenTelemetryTopicStats implements AutoCloseable { compactionEntriesCounter, compactionBytesCounter, transactionCounter, + transactionBufferClientOperationCounter, delayedSubscriptionCounter); } @@ -399,6 +412,8 @@ public class OpenTelemetryTopicStats implements AutoCloseable { } if (topic instanceof PersistentTopic persistentTopic) { + var persistentTopicMetrics = persistentTopic.getPersistentTopicMetrics(); + var persistentTopicAttributes = persistentTopic.getTopicAttributes(); var managedLedger = persistentTopic.getManagedLedger(); var managedLedgerStats = persistentTopic.getManagedLedger().getStats(); @@ -416,7 +431,7 @@ public class OpenTelemetryTopicStats implements AutoCloseable { topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime(), attributes); backlogQuotaAge.record(topic.getBestEffortOldestUnacknowledgedMessageAgeSeconds(), attributes); - var backlogQuotaMetrics = persistentTopic.getPersistentTopicMetrics().getBacklogQuotaMetrics(); + var backlogQuotaMetrics = persistentTopicMetrics.getBacklogQuotaMetrics(); backlogEvictionCounter.record(backlogQuotaMetrics.getSizeBasedBacklogQuotaExceededEvictionCount(), persistentTopicAttributes.getSizeBasedQuotaAttributes()); backlogEvictionCounter.record(backlogQuotaMetrics.getTimeBasedBacklogQuotaExceededEvictionCount(), @@ -430,6 +445,16 @@ public class OpenTelemetryTopicStats implements AutoCloseable { transactionCounter.record(txnBuffer.getAbortedTxnCount(), persistentTopicAttributes.getTransactionAbortedAttributes()); + var txnBufferClientMetrics = persistentTopicMetrics.getTransactionBufferClientMetrics(); + transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getCommitSucceededCount().sum(), + persistentTopicAttributes.getTransactionBufferClientCommitSucceededAttributes()); + transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getCommitFailedCount().sum(), + persistentTopicAttributes.getTransactionBufferClientCommitFailedAttributes()); + transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getAbortSucceededCount().sum(), + persistentTopicAttributes.getTransactionBufferClientAbortSucceededAttributes()); + transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getAbortFailedCount().sum(), + persistentTopicAttributes.getTransactionBufferClientAbortFailedAttributes()); + Optional.ofNullable(pulsar.getNullableCompactor()) .map(Compactor::getStats) .flatMap(compactorMXBean -> compactorMXBean.getCompactionRecordForTopic(topic.getName())) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionCoordinatorStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionCoordinatorStats.java new file mode 100644 index 00000000000..ab73b2390b3 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionCoordinatorStats.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.metrics.BatchCallback; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; + +public class OpenTelemetryTransactionCoordinatorStats implements AutoCloseable { + + // Replaces ['pulsar_txn_aborted_total', + // 'pulsar_txn_committed_total', + // 'pulsar_txn_created_total', + // 'pulsar_txn_timeout_total', + // 'pulsar_txn_active_count'] + public static final String TRANSACTION_COUNTER = "pulsar.broker.transaction.coordinator.transaction.count"; + private final ObservableLongMeasurement transactionCounter; + + // Replaces pulsar_txn_append_log_total + public static final String APPEND_LOG_COUNTER = "pulsar.broker.transaction.coordinator.append.log.count"; + private final ObservableLongMeasurement appendLogCounter; + + private final BatchCallback batchCallback; + + public OpenTelemetryTransactionCoordinatorStats(PulsarService pulsar) { + var meter = pulsar.getOpenTelemetry().getMeter(); + + transactionCounter = meter + .upDownCounterBuilder(TRANSACTION_COUNTER) + .setUnit("{transaction}") + .setDescription("The number of transactions handled by the coordinator.") + .buildObserver(); + + appendLogCounter = meter + .counterBuilder(APPEND_LOG_COUNTER) + .setUnit("{entry}") + .setDescription("The number of transaction metadata entries appended by the coordinator.") + .buildObserver(); + + batchCallback = meter.batchCallback(() -> { + var transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService(); + // Avoid NPE during Pulsar shutdown. + if (transactionMetadataStoreService != null) { + transactionMetadataStoreService.getStores() + .values() + .forEach(this::recordMetricsForTransactionMetadataStore); + } + }, + transactionCounter, + appendLogCounter); + } + + @Override + public void close() { + batchCallback.close(); + } + + private void recordMetricsForTransactionMetadataStore(TransactionMetadataStore transactionMetadataStore) { + var attributes = transactionMetadataStore.getAttributes(); + var stats = transactionMetadataStore.getMetadataStoreStats(); + + transactionCounter.record(stats.getAbortedCount(), attributes.getTxnAbortedAttributes()); + transactionCounter.record(stats.getActives(), attributes.getTxnActiveAttributes()); + transactionCounter.record(stats.getCommittedCount(), attributes.getTxnCommittedAttributes()); + transactionCounter.record(stats.getCreatedCount(), attributes.getTxnCreatedAttributes()); + transactionCounter.record(stats.getTimeoutCount(), attributes.getTxnTimeoutAttributes()); + + appendLogCounter.record(stats.getAppendLogCount(), attributes.getCommonAttributes()); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionPendingAckStoreStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionPendingAckStoreStats.java new file mode 100644 index 00000000000..562ad56e44d --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionPendingAckStoreStats.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.metrics.ObservableLongCounter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; + +public class OpenTelemetryTransactionPendingAckStoreStats implements AutoCloseable { + + // Replaces ['pulsar_txn_tp_committed_count_total', 'pulsar_txn_tp_aborted_count_total'] + public static final String ACK_COUNTER = "pulsar.broker.transaction.pending.ack.store.transaction.count"; + private final ObservableLongCounter ackCounter; + + public OpenTelemetryTransactionPendingAckStoreStats(PulsarService pulsar) { + var meter = pulsar.getOpenTelemetry().getMeter(); + + ackCounter = meter + .counterBuilder(ACK_COUNTER) + .setUnit("{transaction}") + .setDescription("The number of transactions handled by the persistent ack store.") + .buildWithCallback(measurement -> pulsar.getBrokerService() + .getTopics() + .values() + .stream() + .filter(topicFuture -> topicFuture.isDone() && !topicFuture.isCompletedExceptionally()) + .map(CompletableFuture::join) + .filter(Optional::isPresent) + .map(Optional::get) + .filter(Topic::isPersistent) + .map(Topic::getSubscriptions) + .forEach(subs -> subs.forEach((__, sub) -> recordMetricsForSubscription(measurement, sub)))); + } + + @Override + public void close() { + ackCounter.close(); + } + + private void recordMetricsForSubscription(ObservableLongMeasurement measurement, Subscription subscription) { + assert subscription instanceof PersistentSubscription; // The topics have already been filtered for persistence. + var stats = ((PersistentSubscription) subscription).getPendingAckHandle().getPendingAckHandleStats(); + if (stats != null) { + var attributes = stats.getAttributes(); + measurement.record(stats.getCommitSuccessCount(), attributes.getCommitSuccessAttributes()); + measurement.record(stats.getCommitFailedCount(), attributes.getCommitFailureAttributes()); + measurement.record(stats.getAbortSuccessCount(), attributes.getAbortSuccessAttributes()); + measurement.record(stats.getAbortFailedCount(), attributes.getAbortFailureAttributes()); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java index 8fda233ff1d..c21b212f981 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.transaction.buffer; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientStatsImpl; import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler; @@ -34,10 +35,10 @@ public interface TransactionBufferClientStats { void close(); - static TransactionBufferClientStats create(boolean exposeTopicMetrics, TransactionBufferHandler handler, - boolean enableTxnCoordinator) { + static TransactionBufferClientStats create(PulsarService pulsarService, boolean exposeTopicMetrics, + TransactionBufferHandler handler, boolean enableTxnCoordinator) { return enableTxnCoordinator - ? TransactionBufferClientStatsImpl.getInstance(exposeTopicMetrics, handler) : NOOP; + ? TransactionBufferClientStatsImpl.getInstance(pulsarService, exposeTopicMetrics, handler) : NOOP; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java index 382d640ca86..96ad0203900 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java @@ -39,10 +39,11 @@ public class TransactionBufferClientImpl implements TransactionBufferClient { private final TransactionBufferHandler tbHandler; private final TransactionBufferClientStats stats; - private TransactionBufferClientImpl(TransactionBufferHandler tbHandler, boolean exposeTopicLevelMetrics, - boolean enableTxnCoordinator) { + private TransactionBufferClientImpl(PulsarService pulsarService, TransactionBufferHandler tbHandler, + boolean exposeTopicLevelMetrics, boolean enableTxnCoordinator) { this.tbHandler = tbHandler; - this.stats = TransactionBufferClientStats.create(exposeTopicLevelMetrics, tbHandler, enableTxnCoordinator); + this.stats = TransactionBufferClientStats.create(pulsarService, exposeTopicLevelMetrics, tbHandler, + enableTxnCoordinator); } public static TransactionBufferClient create(PulsarService pulsarService, HashedWheelTimer timer, @@ -53,7 +54,7 @@ public class TransactionBufferClientImpl implements TransactionBufferClient { ServiceConfiguration config = pulsarService.getConfig(); boolean exposeTopicLevelMetrics = config.isExposeTopicLevelMetricsInPrometheus(); boolean enableTxnCoordinator = config.isTransactionCoordinatorEnabled(); - return new TransactionBufferClientImpl(handler, exposeTopicLevelMetrics, enableTxnCoordinator); + return new TransactionBufferClientImpl(pulsarService, handler, exposeTopicLevelMetrics, enableTxnCoordinator); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java index a447f707893..4f1c2ca30cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java @@ -18,31 +18,55 @@ */ package org.apache.pulsar.broker.transaction.buffer.impl; +import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import io.prometheus.client.Summary; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.LongAdder; +import lombok.NonNull; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.persistent.PersistentTopicMetrics; +import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats; import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; public final class TransactionBufferClientStatsImpl implements TransactionBufferClientStats { private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 0.9999, 1}; private final AtomicBoolean closed = new AtomicBoolean(false); + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER) private final Counter abortFailed; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER) private final Counter commitFailed; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER) private final Summary abortLatency; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER) private final Summary commitLatency; + + public static final String PENDING_TRANSACTION_COUNTER = "pulsar.broker.transaction.buffer.client.pending.count"; + private final ObservableLongUpDownCounter pendingTransactionCounter; + + @PulsarDeprecatedMetric(newMetricName = PENDING_TRANSACTION_COUNTER) private final Gauge pendingRequests; private final boolean exposeTopicLevelMetrics; + private final BrokerService brokerService; + private static TransactionBufferClientStats instance; - private TransactionBufferClientStatsImpl(boolean exposeTopicLevelMetrics, - TransactionBufferHandler handler) { + private TransactionBufferClientStatsImpl(@NonNull PulsarService pulsarService, + boolean exposeTopicLevelMetrics, + @NonNull TransactionBufferHandler handler) { + this.brokerService = Objects.requireNonNull(pulsarService.getBrokerService()); this.exposeTopicLevelMetrics = exposeTopicLevelMetrics; String[] labelNames = exposeTopicLevelMetrics ? new String[]{"namespace", "topic"} : new String[]{"namespace"}; @@ -63,9 +87,14 @@ public final class TransactionBufferClientStatsImpl implements TransactionBuffer .setChild(new Gauge.Child() { @Override public double get() { - return null == handler ? 0 : handler.getPendingRequestsCount(); + return handler.getPendingRequestsCount(); } }); + this.pendingTransactionCounter = pulsarService.getOpenTelemetry().getMeter() + .upDownCounterBuilder(PENDING_TRANSACTION_COUNTER) + .setDescription("The number of pending transactions in the transaction buffer client.") + .setUnit("{transaction}") + .buildWithCallback(measurement -> measurement.record(handler.getPendingRequestsCount())); } private Summary buildSummary(String name, String help, String[] labelNames) { @@ -77,33 +106,52 @@ public final class TransactionBufferClientStatsImpl implements TransactionBuffer return builder.register(); } - public static synchronized TransactionBufferClientStats getInstance(boolean exposeTopicLevelMetrics, + public static synchronized TransactionBufferClientStats getInstance(PulsarService pulsarService, + boolean exposeTopicLevelMetrics, TransactionBufferHandler handler) { if (null == instance) { - instance = new TransactionBufferClientStatsImpl(exposeTopicLevelMetrics, handler); + instance = new TransactionBufferClientStatsImpl(pulsarService, exposeTopicLevelMetrics, handler); } - return instance; } @Override public void recordAbortFailed(String topic) { this.abortFailed.labels(labelValues(topic)).inc(); + getTransactionBufferClientMetrics(topic) + .map(PersistentTopicMetrics.TransactionBufferClientMetrics::getAbortFailedCount) + .ifPresent(LongAdder::increment); } @Override public void recordCommitFailed(String topic) { this.commitFailed.labels(labelValues(topic)).inc(); + getTransactionBufferClientMetrics(topic) + .map(PersistentTopicMetrics.TransactionBufferClientMetrics::getCommitFailedCount) + .ifPresent(LongAdder::increment); } @Override public void recordAbortLatency(String topic, long nanos) { this.abortLatency.labels(labelValues(topic)).observe(nanos); + getTransactionBufferClientMetrics(topic) + .map(PersistentTopicMetrics.TransactionBufferClientMetrics::getAbortSucceededCount) + .ifPresent(LongAdder::increment); } @Override public void recordCommitLatency(String topic, long nanos) { this.commitLatency.labels(labelValues(topic)).observe(nanos); + getTransactionBufferClientMetrics(topic) + .map(PersistentTopicMetrics.TransactionBufferClientMetrics::getCommitSucceededCount) + .ifPresent(LongAdder::increment); + } + + private Optional<PersistentTopicMetrics.TransactionBufferClientMetrics> getTransactionBufferClientMetrics( + String topic) { + return brokerService.getTopicReference(topic) + .filter(t -> t instanceof PersistentTopic) + .map(t -> ((PersistentTopic) t).getPersistentTopicMetrics().getTransactionBufferClientMetrics()); } private String[] labelValues(String topic) { @@ -125,6 +173,7 @@ public final class TransactionBufferClientStatsImpl implements TransactionBuffer CollectorRegistry.defaultRegistry.unregister(this.abortLatency); CollectorRegistry.defaultRegistry.unregister(this.commitLatency); CollectorRegistry.defaultRegistry.unregister(this.pendingRequests); + pendingTransactionCounter.close(); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java index 168a6b1483f..dcebbb2829e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java @@ -145,6 +145,13 @@ public interface PendingAckHandle { */ TransactionPendingAckStats getStats(boolean lowWaterMarks); + /** + * Get the raw pending ack handle stats. + * + * @return the raw stats of this pending ack handle. + */ + PendingAckHandleStats getPendingAckHandleStats(); + /** * Close the pending ack handle. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleAttributes.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleAttributes.java new file mode 100644 index 00000000000..87363b673e1 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleAttributes.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack; + +import io.opentelemetry.api.common.Attributes; +import lombok.Getter; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.TransactionPendingAckOperationStatus; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.TransactionStatus; + +@Getter +public class PendingAckHandleAttributes { + + private final Attributes commitSuccessAttributes; + private final Attributes commitFailureAttributes; + private final Attributes abortSuccessAttributes; + private final Attributes abortFailureAttributes; + + public PendingAckHandleAttributes(String topic, String subscription) { + var topicName = TopicName.get(topic); + commitSuccessAttributes = getAttributes(topicName, subscription, TransactionStatus.COMMITTED, + TransactionPendingAckOperationStatus.SUCCESS); + commitFailureAttributes = getAttributes(topicName, subscription, TransactionStatus.COMMITTED, + TransactionPendingAckOperationStatus.FAILURE); + abortSuccessAttributes = getAttributes(topicName, subscription, TransactionStatus.ABORTED, + TransactionPendingAckOperationStatus.SUCCESS); + abortFailureAttributes = getAttributes(topicName, subscription, TransactionStatus.ABORTED, + TransactionPendingAckOperationStatus.FAILURE); + } + + private static Attributes getAttributes(TopicName topicName, String subscriptionName, + TransactionStatus txStatus, + TransactionPendingAckOperationStatus txAckStoreStatus) { + var builder = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subscriptionName) + .put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant()) + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace()) + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName()) + .putAll(txStatus.attributes) + .putAll(txAckStoreStatus.attributes); + if (topicName.isPartitioned()) { + builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex()); + } + return builder.build(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java index 7f01b9b69f9..855651c9116 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java @@ -28,6 +28,13 @@ public interface PendingAckHandleStats { void close(); + long getCommitSuccessCount(); + long getCommitFailedCount(); + long getAbortSuccessCount(); + long getAbortFailedCount(); + + PendingAckHandleAttributes getAttributes(); + static PendingAckHandleStats create(String topic, String subName, boolean exposeTopicLevelMetrics) { return new PendingAckHandleStatsImpl(topic, subName, exposeTopicLevelMetrics); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java index 4d5852ea33d..fb633f7af65 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; @@ -91,6 +92,11 @@ public class PendingAckHandleDisabled implements PendingAckHandle { return null; } + @Override + public PendingAckHandleStats getPendingAckHandleStats() { + return null; + } + @Override public CompletableFuture<Void> closeAsync() { return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 98d0d3bf1b9..6a071c891ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -50,7 +50,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.commons.collections4.map.LinkedMap; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -155,20 +154,14 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi this.topicName = persistentSubscription.getTopicName(); this.subName = persistentSubscription.getName(); this.persistentSubscription = persistentSubscription; - internalPinnedExecutor = persistentSubscription - .getTopic() - .getBrokerService() - .getPulsar() - .getTransactionExecutorProvider() - .getExecutor(this); - - ServiceConfiguration config = persistentSubscription.getTopic().getBrokerService().pulsar().getConfig(); - boolean exposeTopicLevelMetrics = config.isExposeTopicLevelMetricsInPrometheus(); - this.handleStats = PendingAckHandleStats.create(topicName, subName, exposeTopicLevelMetrics); - - this.pendingAckStoreProvider = this.persistentSubscription.getTopic() - .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider(); - transactionOpTimer = persistentSubscription.getTopic().getBrokerService().getPulsar().getTransactionTimer(); + var pulsar = persistentSubscription.getTopic().getBrokerService().getPulsar(); + internalPinnedExecutor = pulsar.getTransactionExecutorProvider().getExecutor(this); + + this.handleStats = PendingAckHandleStats.create( + topicName, subName, pulsar.getConfig().isExposeTopicLevelMetricsInPrometheus()); + + this.pendingAckStoreProvider = pulsar.getTransactionPendingAckStoreProvider(); + transactionOpTimer = pulsar.getTransactionTimer(); init(); } @@ -1021,6 +1014,11 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi return transactionInPendingAckStats; } + @Override + public PendingAckHandleStats getPendingAckHandleStats() { + return handleStats; + } + @Override public CompletableFuture<Void> closeAsync() { changeToCloseState(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java index f30c233af59..a89b582b838 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java @@ -22,7 +22,10 @@ import io.prometheus.client.Counter; import io.prometheus.client.Summary; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleAttributes; import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats; import org.apache.pulsar.common.naming.TopicName; @@ -37,6 +40,19 @@ public class PendingAckHandleStatsImpl implements PendingAckHandleStats { private final String[] labelFailed; private final String[] commitLatencyLabel; + private final String topic; + private final String subscription; + + private final LongAdder commitTxnSucceedCounter = new LongAdder(); + private final LongAdder commitTxnFailedCounter = new LongAdder(); + private final LongAdder abortTxnSucceedCounter = new LongAdder(); + private final LongAdder abortTxnFailedCounter = new LongAdder(); + + private volatile PendingAckHandleAttributes attributes = null; + private static final AtomicReferenceFieldUpdater<PendingAckHandleStatsImpl, PendingAckHandleAttributes> + ATTRIBUTES_UPDATER = AtomicReferenceFieldUpdater.newUpdater( + PendingAckHandleStatsImpl.class, PendingAckHandleAttributes.class, "attributes"); + public PendingAckHandleStatsImpl(String topic, String subscription, boolean exposeTopicLevelMetrics) { initialize(exposeTopicLevelMetrics); @@ -51,6 +67,9 @@ public class PendingAckHandleStatsImpl implements PendingAckHandleStats { } } + this.topic = topic; + this.subscription = subscription; + labelSucceed = exposeTopicLevelMetrics0 ? new String[]{namespace, topic, subscription, "succeed"} : new String[]{namespace, "succeed"}; labelFailed = exposeTopicLevelMetrics0 @@ -62,18 +81,24 @@ public class PendingAckHandleStatsImpl implements PendingAckHandleStats { @Override public void recordCommitTxn(boolean success, long nanos) { String[] labels; + LongAdder counter; if (success) { labels = labelSucceed; + counter = commitTxnSucceedCounter; commitTxnLatency.labels(commitLatencyLabel).observe(TimeUnit.NANOSECONDS.toMicros(nanos)); } else { labels = labelFailed; + counter = commitTxnFailedCounter; } commitTxnCounter.labels(labels).inc(); + counter.increment(); } @Override public void recordAbortTxn(boolean success) { abortTxnCounter.labels(success ? labelSucceed : labelFailed).inc(); + var counter = success ? abortTxnSucceedCounter : abortTxnFailedCounter; + counter.increment(); } @Override @@ -81,11 +106,40 @@ public class PendingAckHandleStatsImpl implements PendingAckHandleStats { if (exposeTopicLevelMetrics0) { commitTxnCounter.remove(this.labelSucceed); commitTxnCounter.remove(this.labelFailed); + abortTxnCounter.remove(this.labelSucceed); abortTxnCounter.remove(this.labelFailed); - abortTxnCounter.remove(this.labelFailed); } } + @Override + public long getCommitSuccessCount() { + return commitTxnSucceedCounter.sum(); + } + + @Override + public long getCommitFailedCount() { + return commitTxnFailedCounter.sum(); + } + + @Override + public long getAbortSuccessCount() { + return abortTxnSucceedCounter.sum(); + } + + @Override + public long getAbortFailedCount() { + return abortTxnFailedCounter.sum(); + } + + @Override + public PendingAckHandleAttributes getAttributes() { + if (attributes != null) { + return attributes; + } + return ATTRIBUTES_UPDATER.updateAndGet(PendingAckHandleStatsImpl.this, + old -> old != null ? old : new PendingAckHandleAttributes(topic, subscription)); + } + static void initialize(boolean exposeTopicLevelMetrics) { if (INITIALIZED.compareAndSet(false, true)) { exposeTopicLevelMetrics0 = exposeTopicLevelMetrics; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 14d4375b7bf..2a928084e64 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.transaction; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME; import static org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_SUFFIX; import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; @@ -94,7 +95,6 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil; import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; import org.apache.pulsar.broker.systopic.SystemTopicClient; @@ -231,19 +231,35 @@ public class TransactionTest extends TransactionTestBase { .build(); var metrics = pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics(); - BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.TRANSACTION_COUNTER, + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER, + Attributes.builder() + .putAll(attributes) + .remove(OpenTelemetryAttributes.PULSAR_DOMAIN) + .putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes) + .putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.SUCCESS.attributes) + .build(), + 1); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER, + Attributes.builder() + .putAll(attributes) + .remove(OpenTelemetryAttributes.PULSAR_DOMAIN) + .putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes) + .putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.SUCCESS.attributes) + .build(), + 1); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.TRANSACTION_COUNTER, Attributes.builder() .putAll(attributes) .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "committed") .build(), 1); - BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.TRANSACTION_COUNTER, + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.TRANSACTION_COUNTER, Attributes.builder() .putAll(attributes) .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "aborted") .build(), 1); - BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.TRANSACTION_COUNTER, + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.TRANSACTION_COUNTER, Attributes.builder() .putAll(attributes) .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "active") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index af12caf1efd..dea79f391e3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -18,12 +18,14 @@ */ package org.apache.pulsar.broker.transaction.buffer; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.when; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.fail; +import io.opentelemetry.api.common.Attributes; import java.time.Duration; import java.util.Collections; import java.util.List; @@ -44,6 +46,7 @@ import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; @@ -57,6 +60,7 @@ import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; @@ -70,7 +74,6 @@ import org.testng.annotations.Test; public class TopicTransactionBufferTest extends TransactionTestBase { - @BeforeMethod(alwaysRun = true) protected void setup() throws Exception { setBrokerCount(1); @@ -101,10 +104,19 @@ public class TopicTransactionBufferTest extends TransactionTestBase { @Test public void testTransactionBufferAppendMarkerWriteFailState() throws Exception { final String topic = "persistent://" + NAMESPACE1 + "/testPendingAckManageLedgerWriteFailState"; + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_TENANT, "tnx") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "tnx/ns1") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic) + .putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes) + .putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.FAILURE.attributes) + .build(); + Transaction txn = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.SECONDS) .build().get(); + @Cleanup Producer<byte[]> producer = pulsarClient .newProducer() .topic(topic) @@ -112,11 +124,19 @@ public class TopicTransactionBufferTest extends TransactionTestBase { .enableBatching(false) .create(); + assertMetricLongSumValue( + pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics(), + OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER, attributes, 0); + producer.newMessage(txn).value("test".getBytes()).send(); PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0) .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get(); FieldUtils.writeField(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed, true); txn.commit().get(); + + assertMetricLongSumValue( + pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics(), + OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER, attributes, 1); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index 00cdb4162f0..9487e3d3746 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.transaction.pendingack; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics; import static org.mockito.ArgumentMatchers.any; @@ -31,6 +32,7 @@ import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.fail; import com.google.common.collect.Multimap; +import io.opentelemetry.api.common.Attributes; import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -56,6 +58,7 @@ import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.stats.OpenTelemetryTransactionPendingAckStoreStats; import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; @@ -78,6 +81,7 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -366,6 +370,42 @@ public class PendingAckPersistentTest extends TransactionTestBase { assertTrue(metric.value > 0); } } + + var otelMetrics = pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics(); + var commonAttributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_TENANT, "tnx") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "tnx/ns1") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, TopicName.get(PENDING_ACK_REPLAY_TOPIC).toString()) + .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subName) + .build(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryTransactionPendingAckStoreStats.ACK_COUNTER, + Attributes.builder() + .putAll(commonAttributes) + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "committed") + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS, "success") + .build(), + 50); + assertMetricLongSumValue(otelMetrics, OpenTelemetryTransactionPendingAckStoreStats.ACK_COUNTER, + Attributes.builder() + .putAll(commonAttributes) + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "committed") + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS, "failure") + .build(), + 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryTransactionPendingAckStoreStats.ACK_COUNTER, + Attributes.builder() + .putAll(commonAttributes) + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "aborted") + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS, "success") + .build(), + 50); + assertMetricLongSumValue(otelMetrics, OpenTelemetryTransactionPendingAckStoreStats.ACK_COUNTER, + Attributes.builder() + .putAll(commonAttributes) + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "aborted") + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS, "failure") + .build(), + 0); } @Test diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index b530b50ee59..f485e300926 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -117,12 +117,43 @@ public interface OpenTelemetryAttributes { */ AttributeKey<String> PULSAR_TRANSACTION_STATUS = AttributeKey.stringKey("pulsar.transaction.status"); enum TransactionStatus { + ABORTED, ACTIVE, COMMITTED, - ABORTED; + CREATED, + TIMEOUT; public final Attributes attributes = Attributes.of(PULSAR_TRANSACTION_STATUS, name().toLowerCase()); } + /** + * The status of the Pulsar transaction ack store operation. + */ + AttributeKey<String> PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS = + AttributeKey.stringKey("pulsar.transaction.pending.ack.store.operation.status"); + enum TransactionPendingAckOperationStatus { + SUCCESS, + FAILURE; + public final Attributes attributes = + Attributes.of(PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS, name().toLowerCase()); + } + + /** + * The ID of the Pulsar transaction coordinator. + */ + AttributeKey<Long> PULSAR_TRANSACTION_COORDINATOR_ID = AttributeKey.longKey("pulsar.transaction.coordinator.id"); + + /** + * The status of the Pulsar transaction buffer client operation. + */ + AttributeKey<String> PULSAR_TRANSACTION_BUFFER_CLIENT_OPERATION_STATUS = + AttributeKey.stringKey("pulsar.transaction.buffer.client.operation.status"); + enum TransactionBufferClientOperationStatus { + SUCCESS, + FAILURE; + public final Attributes attributes = + Attributes.of(PULSAR_TRANSACTION_BUFFER_CLIENT_OPERATION_STATUS, name().toLowerCase()); + } + /** * The status of the Pulsar compaction operation. */ diff --git a/pulsar-transaction/coordinator/pom.xml b/pulsar-transaction/coordinator/pom.xml index 4728cd40634..fc326d9e9ba 100644 --- a/pulsar-transaction/coordinator/pom.xml +++ b/pulsar-transaction/coordinator/pom.xml @@ -41,6 +41,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-opentelemetry</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>managed-ledger</artifactId> diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java index ff5adb4d409..850fcfb4d19 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java @@ -133,6 +133,15 @@ public interface TransactionMetadataStore { */ TransactionMetadataStoreStats getMetadataStoreStats(); + /** + * Get the transaction metadata store OpenTelemetry attributes. + * + * @return TransactionMetadataStoreAttributes {@link TransactionMetadataStoreAttributes} + */ + default TransactionMetadataStoreAttributes getAttributes() { + return new TransactionMetadataStoreAttributes(this); + } + /** * Get the transactions witch timeout is bigger than given timeout. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreAttributes.java similarity index 50% copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java copy to pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreAttributes.java index 048edafe884..e8ae0f6d039 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreAttributes.java @@ -16,58 +16,44 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.service; +package org.apache.pulsar.transaction.coordinator; import io.opentelemetry.api.common.Attributes; import lombok.Getter; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; @Getter -public class PersistentTopicAttributes extends TopicAttributes { - - private final Attributes timeBasedQuotaAttributes; - private final Attributes sizeBasedQuotaAttributes; - - private final Attributes compactionSuccessAttributes; - private final Attributes compactionFailureAttributes; - - private final Attributes transactionActiveAttributes; - private final Attributes transactionCommittedAttributes; - private final Attributes transactionAbortedAttributes; - - public PersistentTopicAttributes(TopicName topicName) { - super(topicName); - - timeBasedQuotaAttributes = Attributes.builder() - .putAll(commonAttributes) - .putAll(OpenTelemetryAttributes.BacklogQuotaType.TIME.attributes) - .build(); - sizeBasedQuotaAttributes = Attributes.builder() +public class TransactionMetadataStoreAttributes { + + private final Attributes commonAttributes; + private final Attributes txnAbortedAttributes; + private final Attributes txnActiveAttributes; + private final Attributes txnCommittedAttributes; + private final Attributes txnCreatedAttributes; + private final Attributes txnTimeoutAttributes; + + public TransactionMetadataStoreAttributes(TransactionMetadataStore store) { + this.commonAttributes = Attributes.of( + OpenTelemetryAttributes.PULSAR_TRANSACTION_COORDINATOR_ID, store.getTransactionCoordinatorID().getId()); + this.txnAbortedAttributes = Attributes.builder() .putAll(commonAttributes) - .putAll(OpenTelemetryAttributes.BacklogQuotaType.SIZE.attributes) + .putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes) .build(); - - transactionActiveAttributes = Attributes.builder() + this.txnActiveAttributes = Attributes.builder() .putAll(commonAttributes) .putAll(OpenTelemetryAttributes.TransactionStatus.ACTIVE.attributes) .build(); - transactionCommittedAttributes = Attributes.builder() + this.txnCommittedAttributes = Attributes.builder() .putAll(commonAttributes) .putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes) .build(); - transactionAbortedAttributes = Attributes.builder() - .putAll(commonAttributes) - .putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes) - .build(); - - compactionSuccessAttributes = Attributes.builder() + this.txnCreatedAttributes = Attributes.builder() .putAll(commonAttributes) - .putAll(OpenTelemetryAttributes.CompactionStatus.SUCCESS.attributes) + .putAll(OpenTelemetryAttributes.TransactionStatus.CREATED.attributes) .build(); - compactionFailureAttributes = Attributes.builder() + this.txnTimeoutAttributes = Attributes.builder() .putAll(commonAttributes) - .putAll(OpenTelemetryAttributes.CompactionStatus.FAILURE.attributes) + .putAll(OpenTelemetryAttributes.TransactionStatus.TIMEOUT.attributes) .build(); } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java index 0f3c5e42d7a..7817d484875 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java @@ -23,12 +23,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.LongAdder; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreAttributes; import org.apache.pulsar.transaction.coordinator.TransactionSubscription; import org.apache.pulsar.transaction.coordinator.TxnMeta; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException; @@ -49,6 +51,11 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore { private final LongAdder abortTransactionCount; private final LongAdder transactionTimeoutCount; + private volatile TransactionMetadataStoreAttributes attributes = null; + private static final AtomicReferenceFieldUpdater<InMemTransactionMetadataStore, TransactionMetadataStoreAttributes> + ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater( + InMemTransactionMetadataStore.class, TransactionMetadataStoreAttributes.class, "attributes"); + InMemTransactionMetadataStore(TransactionCoordinatorID tcID) { this.tcID = tcID; this.localID = new AtomicLong(0L); @@ -165,4 +172,13 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore { public List<TxnMeta> getSlowTransactions(long timeout) { return null; } + + @Override + public TransactionMetadataStoreAttributes getAttributes() { + if (attributes != null) { + return attributes; + } + return ATTRIBUTES_FIELD_UPDATER.updateAndGet(this, + old -> old != null ? old : new TransactionMetadataStoreAttributes(this)); + } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index b6eaad2e3e3..6bd7a947e38 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; @@ -45,6 +46,7 @@ import org.apache.pulsar.common.util.RecoverTimeRecord; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreAttributes; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; import org.apache.pulsar.transaction.coordinator.TransactionSubscription; @@ -83,6 +85,11 @@ public class MLTransactionMetadataStore public final RecoverTimeRecord recoverTime = new RecoverTimeRecord(); private final long maxActiveTransactionsPerCoordinator; + private volatile TransactionMetadataStoreAttributes attributes = null; + private static final AtomicReferenceFieldUpdater<MLTransactionMetadataStore, TransactionMetadataStoreAttributes> + ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater( + MLTransactionMetadataStore.class, TransactionMetadataStoreAttributes.class, "attributes"); + public MLTransactionMetadataStore(TransactionCoordinatorID tcID, MLTransactionLogImpl mlTransactionLog, TransactionTimeoutTracker timeoutTracker, @@ -549,4 +556,13 @@ public class MLTransactionMetadataStore public ManagedLedger getManagedLedger() { return this.transactionLog.getManagedLedger(); } + + @Override + public TransactionMetadataStoreAttributes getAttributes() { + if (attributes != null) { + return attributes; + } + return ATTRIBUTES_FIELD_UPDATER.updateAndGet(this, + old -> old != null ? old : new TransactionMetadataStoreAttributes(this)); + } }