This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e558cfe9836 [feat][broker] PIP-264: Add OpenTelemetry consumer metrics 
(#22693)
e558cfe9836 is described below

commit e558cfe9836256065befb3ff6d6043eca10aa5ef
Author: Dragos Misca <dragosvic...@users.noreply.github.com>
AuthorDate: Fri May 10 15:35:03 2024 -0700

    [feat][broker] PIP-264: Add OpenTelemetry consumer metrics (#22693)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   8 +
 .../org/apache/pulsar/broker/service/Consumer.java |  32 +++-
 .../broker/stats/OpenTelemetryConsumerStats.java   | 170 +++++++++++++++++++++
 .../stats/OpenTelemetryConsumerStatsTest.java      | 151 ++++++++++++++++++
 .../broker/testcontext/PulsarTestContext.java      |   1 +
 .../pulsar/client/api/BrokerServiceLookupTest.java |   1 +
 .../opentelemetry/OpenTelemetryAttributes.java     |  46 ++++++
 7 files changed, 408 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ac37aca531a..6ee35ad295f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -109,6 +109,7 @@ import 
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
+import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
 import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
@@ -254,6 +255,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private MetricsGenerator metricsGenerator;
     private final PulsarBrokerOpenTelemetry openTelemetry;
     private OpenTelemetryTopicStats openTelemetryTopicStats;
+    private OpenTelemetryConsumerStats openTelemetryConsumerStats;
 
     private TransactionMetadataStoreService transactionMetadataStoreService;
     private TransactionBufferProvider transactionBufferProvider;
@@ -630,8 +632,13 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             brokerClientSharedTimer.stop();
             monotonicSnapshotClock.close();
 
+            if (openTelemetryConsumerStats != null) {
+                openTelemetryConsumerStats.close();
+                openTelemetryConsumerStats = null;
+            }
             if (openTelemetryTopicStats != null) {
                 openTelemetryTopicStats.close();
+                openTelemetryTopicStats = null;
             }
 
             
asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));
@@ -775,6 +782,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             }
 
             openTelemetryTopicStats = new OpenTelemetryTopicStats(this);
+            openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);
 
             localMetadataSynchronizer = 
StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
                     ? new PulsarMetadataEventSynchronizer(this, 
config.getMetadataSyncEventTopic())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 89a9bab497d..fe9fbe6a400 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.util.concurrent.AtomicDouble;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
@@ -90,7 +91,9 @@ public class Consumer {
     private final Rate msgOut;
     private final Rate msgRedeliver;
     private final LongAdder msgOutCounter;
+    private final LongAdder msgRedeliverCounter;
     private final LongAdder bytesOutCounter;
+    private final LongAdder messageAckCounter;
     private final Rate messageAckRate;
 
     private volatile long lastConsumedTimestamp;
@@ -152,6 +155,9 @@ public class Consumer {
     @Getter
     private final SchemaType schemaType;
 
+    @Getter
+    private final Instant connectedSince = Instant.now();
+
     public Consumer(Subscription subscription, SubType subType, String 
topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     boolean isDurable, TransportCnx cnx, String appId,
@@ -182,8 +188,10 @@ public class Consumer {
         this.msgOut = new Rate();
         this.chunkedMessageRate = new Rate();
         this.msgRedeliver = new Rate();
+        this.msgRedeliverCounter = new LongAdder();
         this.bytesOutCounter = new LongAdder();
         this.msgOutCounter = new LongAdder();
+        this.messageAckCounter = new LongAdder();
         this.messageAckRate = new Rate();
         this.appId = appId;
 
@@ -200,7 +208,7 @@ public class Consumer {
         stats = new ConsumerStatsImpl();
         stats.setAddress(cnx.clientSourceAddressAndPort());
         stats.consumerName = consumerName;
-        stats.setConnectedSince(DateFormatter.now());
+        stats.setConnectedSince(DateFormatter.format(connectedSince));
         stats.setClientVersion(cnx.getClientVersion());
         stats.metadata = this.metadata;
 
@@ -238,8 +246,10 @@ public class Consumer {
         this.consumerName = consumerName;
         this.msgOut = null;
         this.msgRedeliver = null;
+        this.msgRedeliverCounter = null;
         this.msgOutCounter = null;
         this.bytesOutCounter = null;
+        this.messageAckCounter = null;
         this.messageAckRate = null;
         this.pendingAcks = null;
         this.stats = null;
@@ -502,6 +512,7 @@ public class Consumer {
         return future
                 .thenApply(v -> {
                     this.messageAckRate.recordEvent(v);
+                    this.messageAckCounter.add(v);
                     return null;
                 });
     }
@@ -922,6 +933,14 @@ public class Consumer {
         return bytesOutCounter.longValue();
     }
 
+    public long getMessageAckCounter() {
+        return messageAckCounter.sum();
+    }
+
+    public long getMessageRedeliverCounter() {
+        return msgRedeliverCounter.sum();
+    }
+
     public int getUnackedMessages() {
         return unackedMessages;
     }
@@ -1059,6 +1078,8 @@ public class Consumer {
             }
 
             
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), 
totalRedeliveryMessages.intValue());
+            msgRedeliverCounter.add(totalRedeliveryMessages.intValue());
+
             subscription.redeliverUnacknowledgedMessages(this, 
pendingPositions);
         } else {
             subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
@@ -1091,6 +1112,7 @@ public class Consumer {
 
         subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
         msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, 
totalRedeliveryMessages);
+        msgRedeliverCounter.add(totalRedeliveryMessages);
 
         int numberOfBlockedPermits = 
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);
 
@@ -1153,6 +1175,14 @@ public class Consumer {
         return clientAddress;
     }
 
+    public String getClientAddressAndPort() {
+        return cnx.clientSourceAddressAndPort();
+    }
+
+    public String getClientVersion() {
+        return cnx.getClientVersion();
+    }
+
     public MessageId getStartMessageId() {
         return startMessageId;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java
new file mode 100644
index 00000000000..25af3959db3
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.Collection;
+import java.util.Optional;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+
+public class OpenTelemetryConsumerStats implements AutoCloseable {
+
+    // Replaces pulsar_consumer_msg_rate_out
+    public static final String MESSAGE_OUT_COUNTER = 
"pulsar.broker.consumer.message.outgoing.count";
+    private final ObservableLongMeasurement messageOutCounter;
+
+    // Replaces pulsar_consumer_msg_throughput_out
+    public static final String BYTES_OUT_COUNTER = 
"pulsar.broker.consumer.message.outgoing.size";
+    private final ObservableLongMeasurement bytesOutCounter;
+
+    // Replaces pulsar_consumer_msg_ack_rate
+    public static final String MESSAGE_ACK_COUNTER = 
"pulsar.broker.consumer.message.ack.count";
+    private final ObservableLongMeasurement messageAckCounter;
+
+    // Replaces pulsar_consumer_msg_rate_redeliver
+    public static final String MESSAGE_REDELIVER_COUNTER = 
"pulsar.broker.consumer.message.redeliver.count";
+    private final ObservableLongMeasurement messageRedeliverCounter;
+
+    // Replaces pulsar_consumer_unacked_messages
+    public static final String MESSAGE_UNACKNOWLEDGED_COUNTER = 
"pulsar.broker.consumer.message.unack.count";
+    private final ObservableLongMeasurement messageUnacknowledgedCounter;
+
+    // Replaces pulsar_consumer_available_permits
+    public static final String MESSAGE_PERMITS_COUNTER = 
"pulsar.broker.consumer.permit.count";
+    private final ObservableLongMeasurement messagePermitsCounter;
+
+    private final BatchCallback batchCallback;
+
+    public OpenTelemetryConsumerStats(PulsarService pulsar) {
+        var meter = pulsar.getOpenTelemetry().getMeter();
+
+        messageOutCounter = meter
+                .counterBuilder(MESSAGE_OUT_COUNTER)
+                .setUnit("{message}")
+                .setDescription("The total number of messages dispatched to 
this consumer.")
+                .buildObserver();
+
+        bytesOutCounter = meter
+                .counterBuilder(BYTES_OUT_COUNTER)
+                .setUnit("By")
+                .setDescription("The total number of messages bytes dispatched 
to this consumer.")
+                .buildObserver();
+
+        messageAckCounter = meter
+                .counterBuilder(MESSAGE_ACK_COUNTER)
+                .setUnit("{ack}")
+                .setDescription("The total number of message acknowledgments 
received from this consumer.")
+                .buildObserver();
+
+        messageRedeliverCounter = meter
+                .counterBuilder(MESSAGE_REDELIVER_COUNTER)
+                .setUnit("{message}")
+                .setDescription("The total number of messages that have been 
redelivered to this consumer.")
+                .buildObserver();
+
+        messageUnacknowledgedCounter = meter
+                .upDownCounterBuilder(MESSAGE_UNACKNOWLEDGED_COUNTER)
+                .setUnit("{message}")
+                .setDescription("The total number of messages unacknowledged 
by this consumer.")
+                .buildObserver();
+
+        messagePermitsCounter = meter
+                .upDownCounterBuilder(MESSAGE_PERMITS_COUNTER)
+                .setUnit("{permit}")
+                .setDescription("The number of permits currently available for 
this consumer.")
+                .buildObserver();
+
+        batchCallback = meter.batchCallback(() -> pulsar.getBrokerService()
+                        .getTopics()
+                        .values()
+                        .stream()
+                        .map(topicFuture -> 
topicFuture.getNow(Optional.empty()))
+                        .filter(Optional::isPresent)
+                        .map(Optional::get)
+                        .map(Topic::getSubscriptions)
+                        .flatMap(s -> s.values().stream())
+                        .map(Subscription::getConsumers)
+                        .flatMap(Collection::stream)
+                        .forEach(this::recordMetricsForConsumer),
+                messageOutCounter,
+                bytesOutCounter,
+                messageAckCounter,
+                messageRedeliverCounter,
+                messageUnacknowledgedCounter,
+                messagePermitsCounter);
+    }
+
+    @Override
+    public void close() {
+        batchCallback.close();
+    }
+
+    private void recordMetricsForConsumer(Consumer consumer) {
+        var subscription = consumer.getSubscription();
+        var topicName = TopicName.get(subscription.getTopic().getName());
+
+        var builder = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, 
consumer.consumerName())
+                .put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, 
consumer.consumerId())
+                .put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE,
+                        consumer.getConnectedSince().getEpochSecond())
+                .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, 
subscription.getName())
+                .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, 
consumer.subType().toString())
+                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, 
topicName.getDomain().toString())
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, 
topicName.getTenant())
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicName.getNamespace())
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, 
topicName.getPartitionedTopicName());
+        if (topicName.isPartitioned()) {
+            builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, 
topicName.getPartitionIndex());
+        }
+        var clientAddress = consumer.getClientAddressAndPort();
+        if (clientAddress != null) {
+            builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, 
clientAddress);
+        }
+        var clientVersion = consumer.getClientVersion();
+        if (clientVersion != null) {
+            builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, 
clientVersion);
+        }
+        var metadataList = consumer.getMetadata()
+                .entrySet()
+                .stream()
+                .map(e -> String.format("%s:%s", e.getKey(), e.getValue()))
+                .toList();
+        builder.put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, 
metadataList);
+        var attributes = builder.build();
+
+        messageOutCounter.record(consumer.getMsgOutCounter(), attributes);
+        bytesOutCounter.record(consumer.getBytesOutCounter(), attributes);
+        messageAckCounter.record(consumer.getMessageAckCounter(), attributes);
+        messageRedeliverCounter.record(consumer.getMessageRedeliverCounter(), 
attributes);
+        messageUnacknowledgedCounter.record(consumer.getUnackedMessages(),
+                Attributes.builder()
+                        .putAll(attributes)
+                        .put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, 
consumer.isBlocked())
+                        .build());
+        messagePermitsCounter.record(consumer.getAvailablePermits(), 
attributes);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java
new file mode 100644
index 00000000000..5fcc6754b08
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doAnswer;
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryConsumerStatsTest extends BrokerTestBase {
+
+    private BrokerInterceptor brokerInterceptor;
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        brokerInterceptor =
+                Mockito.mock(BrokerInterceptor.class, 
Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
+        super.baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+        super.customizeMainPulsarTestContextBuilder(builder);
+        builder.enableOpenTelemetry(true);
+        builder.brokerInterceptor(brokerInterceptor);
+    }
+
+    @Test(timeOut = 30_000)
+    public void testMessagingMetrics() throws Exception {
+        var topicName = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testConsumerMessagingMetrics");
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        var messageCount = 5;
+        var ackCount = 3;
+
+        var subscriptionName = BrokerTestUtil.newUniqueName("test");
+        var receiverQueueSize = 100;
+
+        // Intercept calls to create consumer, in order to fetch client 
information.
+        var consumerRef = new AtomicReference<Consumer>();
+        doAnswer(invocation -> {
+            consumerRef.compareAndSet(null, invocation.getArgument(1));
+            return null;
+        }).when(brokerInterceptor)
+          .consumerCreated(any(), argThat(arg -> 
arg.getSubscription().getName().equals(subscriptionName)), any());
+
+        @Cleanup
+        var consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .receiverQueueSize(receiverQueueSize)
+                .property("prop1", "value1")
+                .subscribe();
+
+        Awaitility.await().until(() -> consumerRef.get() != null);
+        var serverConsumer = consumerRef.get();
+
+        @Cleanup
+        var producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        for (int i = 0; i < messageCount; i++) {
+            producer.send(String.format("msg-%d", i).getBytes());
+            var message = consumer.receive();
+            if (i < ackCount) {
+                consumer.acknowledge(message);
+            }
+        }
+
+        var attributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop")
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc")
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName)
+                .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, 
subscriptionName)
+                .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, 
SubscriptionType.Shared.toString())
+                .put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, 
consumer.getConsumerName())
+                .put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, 0)
+                .put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE,
+                        serverConsumer.getConnectedSince().getEpochSecond())
+                .put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, 
serverConsumer.getClientAddressAndPort())
+                .put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, 
serverConsumer.getClientVersion())
+                .put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, 
List.of("prop1:value1"))
+                .build();
+
+        Awaitility.await().untilAsserted(() -> {
+            var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+
+            assertMetricLongSumValue(metrics, 
OpenTelemetryConsumerStats.MESSAGE_OUT_COUNTER, attributes,
+                    actual -> assertThat(actual).isPositive());
+            assertMetricLongSumValue(metrics, 
OpenTelemetryConsumerStats.BYTES_OUT_COUNTER, attributes,
+                    actual -> assertThat(actual).isPositive());
+
+            assertMetricLongSumValue(metrics, 
OpenTelemetryConsumerStats.MESSAGE_ACK_COUNTER, attributes, ackCount);
+            assertMetricLongSumValue(metrics, 
OpenTelemetryConsumerStats.MESSAGE_PERMITS_COUNTER, attributes,
+                    actual -> 
assertThat(actual).isGreaterThanOrEqualTo(receiverQueueSize - messageCount - 
ackCount));
+
+            var unAckCount = messageCount - ackCount;
+            assertMetricLongSumValue(metrics, 
OpenTelemetryConsumerStats.MESSAGE_UNACKNOWLEDGED_COUNTER,
+                    
attributes.toBuilder().put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, 
false).build(),
+                    unAckCount);
+            assertMetricLongSumValue(metrics, 
OpenTelemetryConsumerStats.MESSAGE_REDELIVER_COUNTER, attributes,
+                    actual -> 
assertThat(actual).isGreaterThanOrEqualTo(unAckCount));
+        });
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index dceb18cbeaa..09cd4f7cb1a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -746,6 +746,7 @@ public class PulsarTestContext implements AutoCloseable {
             if (builder.enableOpenTelemetry) {
                 var reader = InMemoryMetricReader.create();
                 openTelemetryMetricReader(reader);
+                registerCloseable(reader);
                 openTelemetrySdkBuilderCustomizer = 
BrokerOpenTelemetryTestUtil.getOpenTelemetrySdkBuilderConsumer(reader);
             } else {
                 openTelemetrySdkBuilderCustomizer = null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 2d2019b38ed..0ad0b01dc1c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -191,6 +191,7 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
 
         // Disable collecting topic stats during this test, as it deadlocks on 
access to map BrokerService.topics.
         pulsar2.getOpenTelemetryTopicStats().close();
+        pulsar2.getOpenTelemetryConsumerStats().close();
 
         var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
         var lookupRequestSemaphoreField = 
BrokerService.class.getDeclaredField("lookupRequestSemaphore");
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 6088f52f72c..4f898b382e6 100644
--- 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.opentelemetry;
 
 import io.opentelemetry.api.common.AttributeKey;
+import java.util.List;
 
 /**
  * Common OpenTelemetry attributes to be used by Pulsar components.
@@ -55,6 +56,51 @@ public interface OpenTelemetryAttributes {
      */
     AttributeKey<Long> PULSAR_PARTITION_INDEX = 
AttributeKey.longKey("pulsar.partition.index");
 
+    /**
+     * The name of the Pulsar subscription.
+     */
+    AttributeKey<String> PULSAR_SUBSCRIPTION_NAME = 
AttributeKey.stringKey("pulsar.subscription.name");
+
+    /**
+     * The type of the Pulsar subscription.
+     */
+    AttributeKey<String> PULSAR_SUBSCRIPTION_TYPE = 
AttributeKey.stringKey("pulsar.subscription.type");
+
+    /**
+     * The name of the Pulsar consumer.
+     */
+    AttributeKey<String> PULSAR_CONSUMER_NAME = 
AttributeKey.stringKey("pulsar.consumer.name");
+
+    /**
+     * The ID of the Pulsar consumer.
+     */
+    AttributeKey<Long> PULSAR_CONSUMER_ID = 
AttributeKey.longKey("pulsar.consumer.id");
+
+    /**
+     * Indicates whether the consumer is currently blocked on unacknowledged 
messages or not.
+     */
+    AttributeKey<Boolean> PULSAR_CONSUMER_BLOCKED = 
AttributeKey.booleanKey("pulsar.consumer.blocked");
+
+    /**
+     * The consumer metadata properties, as a list of "key:value" pairs.
+     */
+    AttributeKey<List<String>> PULSAR_CONSUMER_METADATA = 
AttributeKey.stringArrayKey("pulsar.consumer.metadata");
+
+    /**
+     * The UTC timestamp of the Pulsar consumer creation.
+     */
+    AttributeKey<Long> PULSAR_CONSUMER_CONNECTED_SINCE = 
AttributeKey.longKey("pulsar.consumer.connected_since");
+
+    /**
+     * The address of the Pulsar client.
+     */
+    AttributeKey<String> PULSAR_CLIENT_ADDRESS = 
AttributeKey.stringKey("pulsar.client.address");
+
+    /**
+     * The version of the Pulsar client.
+     */
+    AttributeKey<String> PULSAR_CLIENT_VERSION = 
AttributeKey.stringKey("pulsar.client.version");
+
     /**
      * The status of the Pulsar transaction.
      */

Reply via email to