This is an automated email from the ASF dual-hosted git repository. heesung pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f4d1d05ee38 [feat][broker] PIP-264: Add OpenTelemetry managed ledger metrics (#22987) f4d1d05ee38 is described below commit f4d1d05ee385bd730cbb4fa09a287614a00400a3 Author: Dragos Misca <dragosvic...@users.noreply.github.com> AuthorDate: Wed Jul 3 12:25:39 2024 -0700 [feat][broker] PIP-264: Add OpenTelemetry managed ledger metrics (#22987) --- .../apache/bookkeeper/mledger/ManagedLedger.java | 7 + .../mledger/ManagedLedgerAttributes.java | 57 ++++++++ .../bookkeeper/mledger/ManagedLedgerMXBean.java | 35 +++++ .../mledger/impl/ManagedLedgerFactoryImpl.java | 3 + .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 6 + .../mledger/impl/ManagedLedgerMBeanImpl.java | 35 +++++ .../impl/OpenTelemetryManagedLedgerStats.java | 153 +++++++++++++++++++++ .../broker/stats/ManagedLedgerMetricsTest.java | 100 +++++++++++++- .../opentelemetry/OpenTelemetryAttributes.java | 17 +++ 9 files changed, 406 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 955a0d78502..a9242d5cc65 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -696,4 +696,11 @@ public interface ManagedLedger { * Check if managed ledger should cache backlog reads. */ void checkCursorsToCacheEntries(); + + /** + * Get managed ledger attributes. + */ + default ManagedLedgerAttributes getManagedLedgerAttributes() { + return new ManagedLedgerAttributes(this); + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java new file mode 100644 index 00000000000..c3759a533a5 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import io.opentelemetry.api.common.Attributes; +import lombok.Getter; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ManagedLedgerOperationStatus; + +@Getter +public class ManagedLedgerAttributes { + + private final Attributes attributes; + private final Attributes attributesOperationSucceed; + private final Attributes attributesOperationFailure; + + public ManagedLedgerAttributes(ManagedLedger ml) { + var mlName = ml.getName(); + attributes = Attributes.of( + OpenTelemetryAttributes.ML_NAME, mlName, + OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName) + ); + attributesOperationSucceed = Attributes.builder() + .putAll(attributes) + .putAll(ManagedLedgerOperationStatus.SUCCESS.attributes) + .build(); + attributesOperationFailure = Attributes.builder() + .putAll(attributes) + .putAll(ManagedLedgerOperationStatus.FAILURE.attributes) + .build(); + } + + private static String getNamespace(String mlName) { + try { + return TopicName.get(TopicName.fromPersistenceNamingEncoding(mlName)).getNamespace(); + } catch (RuntimeException e) { + return null; + } + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java index 44345c430b7..1d978e23785 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java @@ -60,11 +60,21 @@ public interface ManagedLedgerMXBean { */ double getAddEntryBytesRate(); + /** + * @return the total number of bytes written + */ + long getAddEntryBytesTotal(); + /** * @return the bytes/s rate of messages added with replicas */ double getAddEntryWithReplicasBytesRate(); + /** + * @return the total number of bytes written, including replicas + */ + long getAddEntryWithReplicasBytesTotal(); + /** * @return the msg/s rate of messages read */ @@ -75,11 +85,21 @@ public interface ManagedLedgerMXBean { */ double getReadEntriesBytesRate(); + /** + * @return the total number of bytes read + */ + long getReadEntriesBytesTotal(); + /** * @return the rate of mark-delete ops/s */ double getMarkDeleteRate(); + /** + * @return the number of mark-delete ops + */ + long getMarkDeleteTotal(); + /** * @return the number of addEntry requests that succeeded */ @@ -95,6 +115,11 @@ public interface ManagedLedgerMXBean { */ long getAddEntryErrors(); + /** + * @return the total number of addEntry requests that failed + */ + long getAddEntryErrorsTotal(); + /** * @return the number of entries read from the managed ledger (from cache or BK) */ @@ -115,11 +140,21 @@ public interface ManagedLedgerMXBean { */ long getReadEntriesErrors(); + /** + * @return the total number of readEntries requests that failed + */ + long getReadEntriesErrorsTotal(); + /** * @return the number of readEntries requests that cache miss Rate */ double getReadEntriesOpsCacheMissesRate(); + /** + * @return the total number of readEntries requests that cache miss + */ + long getReadEntriesOpsCacheMissesTotal(); + // Entry size statistics double getEntrySizeAverage(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index fc291b801c8..b1939f40e93 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -121,6 +121,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final MetadataStore metadataStore; private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats; + private final OpenTelemetryManagedLedgerStats openTelemetryManagedLedgerStats; //indicate whether shutdown() is called. private volatile boolean closed; @@ -229,6 +230,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { metadataStore.registerSessionListener(this::handleMetadataStoreNotification); openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this); + openTelemetryManagedLedgerStats = new OpenTelemetryManagedLedgerStats(openTelemetry, this); } static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy { @@ -620,6 +622,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { })); }).thenAcceptAsync(__ -> { //wait for tasks in scheduledExecutor executed. + openTelemetryManagedLedgerStats.close(); openTelemetryCacheStats.close(); scheduledExecutor.shutdownNow(); entryCacheManager.clear(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 8d1919dd052..b7734906f75 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -96,6 +96,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerAttributes; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; @@ -326,6 +327,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { */ final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new ConcurrentLinkedQueue<>(); + @Getter + private final ManagedLedgerAttributes managedLedgerAttributes; + /** * This variable is used for testing the tests. * ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata() @@ -338,6 +342,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { final String name) { this(factory, bookKeeper, store, config, scheduledExecutor, name, null); } + public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, final String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) { @@ -373,6 +378,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { this.minBacklogCursorsForCaching = config.getMinimumBacklogCursorsForCaching(); this.minBacklogEntriesForCaching = config.getMinimumBacklogEntriesForCaching(); this.maxBacklogBetweenCursorsForCaching = config.getMaxBacklogBetweenCursorsForCaching(); + this.managedLedgerAttributes = new ManagedLedgerAttributes(this); } synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java index 5e5161a29ca..86320f92924 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java @@ -210,11 +210,21 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean { return addEntryOps.getValueRate(); } + @Override + public long getAddEntryBytesTotal() { + return addEntryOps.getTotalValue(); + } + @Override public double getAddEntryWithReplicasBytesRate() { return addEntryWithReplicasOps.getValueRate(); } + @Override + public long getAddEntryWithReplicasBytesTotal() { + return addEntryWithReplicasOps.getTotalValue(); + } + @Override public double getReadEntriesRate() { return readEntriesOps.getRate(); @@ -225,6 +235,11 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean { return readEntriesOps.getValueRate(); } + @Override + public long getReadEntriesBytesTotal() { + return readEntriesOps.getTotalValue(); + } + @Override public long getAddEntrySucceed() { return addEntryOps.getCount(); @@ -240,6 +255,11 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean { return addEntryOpsFailed.getCount(); } + @Override + public long getAddEntryErrorsTotal() { + return addEntryOpsFailed.getTotalCount(); + } + @Override public long getReadEntriesSucceeded() { return readEntriesOps.getCount(); @@ -255,16 +275,31 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean { return readEntriesOpsFailed.getCount(); } + @Override + public long getReadEntriesErrorsTotal() { + return readEntriesOpsFailed.getTotalCount(); + } + @Override public double getReadEntriesOpsCacheMissesRate() { return readEntriesOpsCacheMisses.getRate(); } + @Override + public long getReadEntriesOpsCacheMissesTotal() { + return readEntriesOpsCacheMisses.getTotalCount(); + } + @Override public double getMarkDeleteRate() { return markDeleteOps.getRate(); } + @Override + public long getMarkDeleteTotal() { + return markDeleteOps.getTotalCount(); + } + @Override public double getEntrySizeAverage() { return entryStats.getAvg(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java new file mode 100644 index 00000000000..f7b9d91dff6 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.BatchCallback; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import org.apache.pulsar.opentelemetry.Constants; + +public class OpenTelemetryManagedLedgerStats implements AutoCloseable { + + // Replaces pulsar_ml_AddEntryMessagesRate + public static final String ADD_ENTRY_COUNTER = "pulsar.broker.managed_ledger.message.outgoing.count"; + private final ObservableLongMeasurement addEntryCounter; + + // Replaces pulsar_ml_AddEntryBytesRate + public static final String BYTES_OUT_COUNTER = "pulsar.broker.managed_ledger.message.outgoing.logical.size"; + private final ObservableLongMeasurement bytesOutCounter; + + // Replaces pulsar_ml_AddEntryWithReplicasBytesRate + public static final String BYTES_OUT_WITH_REPLICAS_COUNTER = + "pulsar.broker.managed_ledger.message.outgoing.replicated.size"; + private final ObservableLongMeasurement bytesOutWithReplicasCounter; + + // Replaces pulsar_ml_NumberOfMessagesInBacklog + public static final String BACKLOG_COUNTER = "pulsar.broker.managed_ledger.backlog.count"; + private final ObservableLongMeasurement backlogCounter; + + // Replaces pulsar_ml_ReadEntriesRate + public static final String READ_ENTRY_COUNTER = "pulsar.broker.managed_ledger.message.incoming.count"; + private final ObservableLongMeasurement readEntryCounter; + + // Replaces pulsar_ml_ReadEntriesBytesRate + public static final String BYTES_IN_COUNTER = "pulsar.broker.managed_ledger.message.incoming.size"; + private final ObservableLongMeasurement bytesInCounter; + + // Replaces brk_ml_ReadEntriesOpsCacheMissesRate + public static final String READ_ENTRY_CACHE_MISS_COUNTER = + "pulsar.broker.managed_ledger.message.incoming.cache.miss.count"; + private final ObservableLongMeasurement readEntryCacheMissCounter; + + // Replaces pulsar_ml_MarkDeleteRate + public static final String MARK_DELETE_COUNTER = "pulsar.broker.managed_ledger.mark_delete.count"; + private final ObservableLongMeasurement markDeleteCounter; + + private final BatchCallback batchCallback; + + public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) { + var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME); + + addEntryCounter = meter + .upDownCounterBuilder(ADD_ENTRY_COUNTER) + .setUnit("{operation}") + .setDescription("The number of write operations to this ledger.") + .buildObserver(); + + bytesOutCounter = meter + .counterBuilder(BYTES_OUT_COUNTER) + .setUnit("By") + .setDescription("The total number of messages bytes written to this ledger, excluding replicas.") + .buildObserver(); + + bytesOutWithReplicasCounter = meter + .counterBuilder(BYTES_OUT_WITH_REPLICAS_COUNTER) + .setUnit("By") + .setDescription("The total number of messages bytes written to this ledger, including replicas.") + .buildObserver(); + + backlogCounter = meter + .upDownCounterBuilder(BACKLOG_COUNTER) + .setUnit("{message}") + .setDescription("The number of messages in backlog for all consumers from this ledger.") + .buildObserver(); + + readEntryCounter = meter + .upDownCounterBuilder(READ_ENTRY_COUNTER) + .setUnit("{operation}") + .setDescription("The number of read operations from this ledger.") + .buildObserver(); + + bytesInCounter = meter + .counterBuilder(BYTES_IN_COUNTER) + .setUnit("By") + .setDescription("The total number of messages bytes read from this ledger.") + .buildObserver(); + + readEntryCacheMissCounter = meter + .upDownCounterBuilder(READ_ENTRY_CACHE_MISS_COUNTER) + .setUnit("{operation}") + .setDescription("The number of cache misses during read operations from this ledger.") + .buildObserver(); + + markDeleteCounter = meter + .counterBuilder(MARK_DELETE_COUNTER) + .setUnit("{operation}") + .setDescription("The total number of mark delete operations for this ledger.") + .buildObserver(); + + batchCallback = meter.batchCallback(() -> factory.getManagedLedgers() + .values() + .forEach(this::recordMetrics), + addEntryCounter, + bytesOutCounter, + bytesOutWithReplicasCounter, + backlogCounter, + readEntryCounter, + bytesInCounter, + readEntryCacheMissCounter, + markDeleteCounter); + } + + @Override + public void close() { + batchCallback.close(); + } + + private void recordMetrics(ManagedLedgerImpl ml) { + var stats = ml.getMbean(); + var ledgerAttributeSet = ml.getManagedLedgerAttributes(); + var attributes = ledgerAttributeSet.getAttributes(); + var attributesSucceed = ledgerAttributeSet.getAttributesOperationSucceed(); + var attributesFailure = ledgerAttributeSet.getAttributesOperationFailure(); + + addEntryCounter.record(stats.getAddEntrySucceedTotal(), attributesSucceed); + addEntryCounter.record(stats.getAddEntryErrorsTotal(), attributesFailure); + bytesOutCounter.record(stats.getAddEntryBytesTotal(), attributes); + bytesOutWithReplicasCounter.record(stats.getAddEntryWithReplicasBytesTotal(), attributes); + + readEntryCounter.record(stats.getReadEntriesSucceededTotal(), attributesSucceed); + readEntryCounter.record(stats.getReadEntriesErrorsTotal(), attributesFailure); + bytesInCounter.record(stats.getReadEntriesBytesTotal(), attributes); + + backlogCounter.record(stats.getNumberOfMessagesInBacklog(), attributes); + markDeleteCounter.record(stats.getMarkDeleteTotal(), attributes); + readEntryCacheMissCounter.record(stats.getReadEntriesOpsCacheMissesTotal(), attributes); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java index bec73121e48..b9c0ab08e4e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java @@ -18,27 +18,38 @@ */ package org.apache.pulsar.broker.stats; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; +import static org.assertj.core.api.Assertions.assertThat; +import com.google.common.collect.Sets; import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.DefaultThreadFactory; +import io.opentelemetry.api.common.Attributes; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; -import com.google.common.collect.Sets; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; +import org.apache.bookkeeper.mledger.impl.OpenTelemetryManagedLedgerStats; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -68,6 +79,12 @@ public class ManagedLedgerMetricsTest extends BrokerTestBase { super.internalCleanup(); } + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) { + super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder); + pulsarTestContextBuilder.enableOpenTelemetry(true); + } + @Test public void testManagedLedgerMetrics() throws Exception { ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar); @@ -76,15 +93,20 @@ public class ManagedLedgerMetricsTest extends BrokerTestBase { List<Metrics> list1 = metrics.generate(); Assert.assertTrue(list1.isEmpty()); - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") - .create(); + var topicName = "persistent://my-property/use/my-ns/my-topic1"; + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + + @Cleanup + var consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub1").subscribe(); + for (int i = 0; i < 10; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); } - for (Entry<String, ManagedLedgerImpl> ledger : ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()) - .getManagedLedgers().entrySet()) { + var managedLedgerFactory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory(); + for (Entry<String, ManagedLedgerImpl> ledger : managedLedgerFactory.getManagedLedgers().entrySet()) { ManagedLedgerMBeanImpl stats = (ManagedLedgerMBeanImpl) ledger.getValue().getStats(); stats.refreshStats(1, TimeUnit.SECONDS); } @@ -96,14 +118,78 @@ public class ManagedLedgerMetricsTest extends BrokerTestBase { String message = "my-message-" + i; producer.send(message.getBytes()); } - for (Entry<String, ManagedLedgerImpl> ledger : ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()) - .getManagedLedgers().entrySet()) { + for (Entry<String, ManagedLedgerImpl> ledger : managedLedgerFactory.getManagedLedgers().entrySet()) { ManagedLedgerMBeanImpl stats = (ManagedLedgerMBeanImpl) ledger.getValue().getStats(); stats.refreshStats(1, TimeUnit.SECONDS); } List<Metrics> list3 = metrics.generate(); Assert.assertEquals(list3.get(0).getMetrics().get(addEntryRateKey), 5.0D); + // Validate OpenTelemetry metrics. + var ledgers = managedLedgerFactory.getManagedLedgers(); + var topicNameObj = TopicName.get(topicName); + var mlName = topicNameObj.getPersistenceNamingEncoding(); + assertThat(ledgers).containsKey(mlName); + var ml = ledgers.get(mlName); + var attribCommon = Attributes.of( + OpenTelemetryAttributes.ML_NAME, mlName, + OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace() + ); + var metricReader = pulsarTestContext.getOpenTelemetryMetricReader(); + + Awaitility.await().untilAsserted(() -> { + var otelMetrics = metricReader.collectAllMetrics(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.BACKLOG_COUNTER, attribCommon, 15); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.MARK_DELETE_COUNTER, attribCommon, 0); + }); + + for (int i = 0; i < 10; i++) { + var msg = consumer.receive(1, TimeUnit.SECONDS); + consumer.acknowledge(msg); + } + + Awaitility.await().untilAsserted(() -> { + var otelMetrics = metricReader.collectAllMetrics(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.BACKLOG_COUNTER, attribCommon, 5); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.MARK_DELETE_COUNTER, attribCommon, + value -> assertThat(value).isPositive()); + }); + + Awaitility.await().untilAsserted(() -> { + @Cleanup + var cons = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(BrokerTestUtil.newUniqueName("sub")) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + cons.receive(1, TimeUnit.SECONDS); + + var attribSucceed = Attributes.of( + OpenTelemetryAttributes.ML_NAME, mlName, + OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace(), + OpenTelemetryAttributes.ML_OPERATION_STATUS, "success" + ); + var attribFailed = Attributes.of( + OpenTelemetryAttributes.ML_NAME, mlName, + OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace(), + OpenTelemetryAttributes.ML_OPERATION_STATUS, "failure" + ); + var otelMetrics = metricReader.collectAllMetrics(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.ADD_ENTRY_COUNTER, attribSucceed, 15); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.ADD_ENTRY_COUNTER, attribFailed, 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.BYTES_OUT_COUNTER, attribCommon, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.BYTES_OUT_WITH_REPLICAS_COUNTER, + attribCommon, value -> assertThat(value).isPositive()); + + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.READ_ENTRY_COUNTER, attribSucceed, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.READ_ENTRY_COUNTER, attribFailed, 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.BYTES_IN_COUNTER, attribCommon, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.READ_ENTRY_CACHE_MISS_COUNTER, + attribCommon, value -> assertThat(value).isPositive()); + }); } @Test diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index f485e300926..24dd1be8509 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -196,6 +196,23 @@ public interface OpenTelemetryAttributes { public final Attributes attributes = Attributes.of(PULSAR_CONNECTION_CREATE_STATUS, name().toLowerCase()); } + // Managed Ledger Attributes + + /** + * The name of the managed ledger. + */ + AttributeKey<String> ML_NAME = AttributeKey.stringKey("pulsar.managed_ledger.name"); + + /** + * The status of the managed ledger operation. + */ + AttributeKey<String> ML_OPERATION_STATUS = AttributeKey.stringKey("pulsar.managed_ledger.operation.status"); + enum ManagedLedgerOperationStatus { + SUCCESS, + FAILURE; + public final Attributes attributes = Attributes.of(ML_OPERATION_STATUS, name().toLowerCase()); + }; + /** * The type of the pool arena. */