asafm commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1515713887


##########
pulsar-broker/pom.xml:
##########
@@ -149,6 +149,11 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-sdk-testing</artifactId>

Review Comment:
   `<scope>test</scope>`



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = 
OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String 
name, long expectedValue,

Review Comment:
   You can use the AssertJ extension they've provided in OTel, like 
@dragosvictor used in the broker side 
`assertThat(metricReader.collectAllMetrics())` and from there you have quite a 
nice readable assert commands: 
https://github.com/apache/pulsar/pull/22058/files#diff-6d0bc9489f6aa68a108c36624bd1deb6b65b61cb15ef5da7887f2b6974072e9a



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = 
OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String 
name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), 
expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String 
name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), 
expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String 
name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, 
Attributes.empty());
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)

Review Comment:
   This attribute should also be prefixed. Perhaps `pulsar.response.status` 
which can either be `success` or `failed`? I didn't place `lookup` inside since 
I presume other client commands will need it.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = 
ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());
+        consumersClosedCounter = 
ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
+                "Counter of sessions closed", topic, Attributes.empty());
+        messagesReceivedCounter = 
ip.newCounter("pulsar.client.received.count", Unit.Messages,
+                "The number of messages explicitly received by the consumer 
application", topic, Attributes.empty());
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received.size", 
Unit.Bytes,
+                "The number of bytes explicitly received by the consumer 
application", topic, Attributes.empty());
+        messagesPrefetchedGauge = 
ip.newUpDownCounter("pulsar.client.consumer.prefetched.count", Unit.Messages,
+                "Number of messages currently sitting in the consumer 
pre-fetch queue", topic, Attributes.empty());
+        bytesPrefetchedGauge = 
ip.newUpDownCounter("pulsar.client.consumer.prefetched.size", Unit.Bytes,
+                "Total number of bytes currently sitting in the consumer 
pre-fetch queue", topic, Attributes.empty());

Review Comment:
   "The size of the messages currently sitting in the consumer pre-fetch queue"



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = 
ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());
+        consumersClosedCounter = 
ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
+                "Counter of sessions closed", topic, Attributes.empty());
+        messagesReceivedCounter = 
ip.newCounter("pulsar.client.received.count", Unit.Messages,

Review Comment:
   Why not `pulsar.client.consumer.message.received.count` and 
`pulsar.client.consumer.message.received.size`? Aligns with metrics below and 
with producer metric `pulsar.client.producer.message.send.*`



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = 
OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String 
name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), 
expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String 
name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), 
expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String 
name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, 
Attributes.empty());
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);
+
+
+        assertCounterValue(metrics, "pulsar.client.producer.opened", 1, 
nsAttrs);
+
+        producer.close();
+        client.close();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.producer.closed", 1, 
nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.connections.closed", 1, 
Attributes.empty());
+    }
+
+    @Test
+    public void testConnectionsFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://invalid-pulsar-address:1234")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        try {
+            client.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .create();
+            fail("Should have failed the producer creation");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        var metrics = collectMetrics();
+
+        assertTrue(getCounterValue(metrics, "pulsar.client.connections.failed",
+                Attributes.builder().put("pulsar.failure.type", 
"tcp-failed").build()) >= 1);
+    }
+
+    @Test
+    public void testPublishFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(admin.getServiceUrl())
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .sendTimeout(3, TimeUnit.SECONDS)
+                .create();
+
+        // Make the client switch to non-existing broker to make publish fail
+        client.updateServiceUrl("pulsar://invalid-address:6650");
+
+
+        try {
+            producer.send("Hello");
+            fail("Should have failed to publish");
+        } catch (Exception e) {
+            // expected
+        }
+
+        var metrics = collectMetrics();
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsFailure = nsAttrs.toBuilder()
+                .put("success", false)
+                .build();
+
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.size", 0, nsAttrs);
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.message.send.duration", 1, nsAttrsFailure);
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.rpc.send.duration", 1, nsAttrsFailure);
+    }
+
+    @Test
+    public void testConsumerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("Hello");
+        }
+
+        Thread.sleep(1000);
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        var metrics = collectMetrics();
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, 
Attributes.empty());
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 
10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", 
"hello".length() * 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.opened", 1, 
nsAttrs);
+
+        Message<String> msg1 = consumer.receive();
+        consumer.acknowledge(msg1);
+
+        Message<String> msg2 = consumer.receive();
+        consumer.negativeAcknowledge(msg2);
+
+        /* Message<String> msg3 = */ consumer.receive();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 
7, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", 
"hello".length() * 7, nsAttrs);
+
+        // Let msg3 to reach ack-timeout
+        Thread.sleep(3000);
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 
8, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", 
"hello".length() * 8, nsAttrs);
+
+        assertCounterValue(metrics, "pulsar.client.consumer.message.ack", 1, 
nsAttrs);

Review Comment:
   They only thing missing here is `receive`, which the number of message I 
received explicitly by calling `receive()` or `receiveBatch()`. It will help me 
figure out that I receive 100 by didn't ack or nack them.



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = 
OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String 
name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), 
expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String 
name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), 
expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String 
name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")

Review Comment:
   Shouldn't the namespace be just `my-ns`?



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = 
OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String 
name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), 
expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String 
name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), 
expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String 
name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, 
Attributes.empty());
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.count", 0, nsAttrs);

Review Comment:
   Technically it's an up-down-counter here.
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = 
ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());

Review Comment:
   How about "The number of consumer sessions opened"?
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = 
OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String 
name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), 
expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String 
name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), 
expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String 
name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, 
Attributes.empty());
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);
+
+
+        assertCounterValue(metrics, "pulsar.client.producer.opened", 1, 
nsAttrs);
+
+        producer.close();
+        client.close();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.producer.closed", 1, 
nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.connections.closed", 1, 
Attributes.empty());
+    }
+
+    @Test
+    public void testConnectionsFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://invalid-pulsar-address:1234")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        try {
+            client.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .create();
+            fail("Should have failed the producer creation");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        var metrics = collectMetrics();
+
+        assertTrue(getCounterValue(metrics, "pulsar.client.connections.failed",
+                Attributes.builder().put("pulsar.failure.type", 
"tcp-failed").build()) >= 1);
+    }
+
+    @Test
+    public void testPublishFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(admin.getServiceUrl())
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .sendTimeout(3, TimeUnit.SECONDS)
+                .create();
+
+        // Make the client switch to non-existing broker to make publish fail
+        client.updateServiceUrl("pulsar://invalid-address:6650");
+
+
+        try {
+            producer.send("Hello");
+            fail("Should have failed to publish");
+        } catch (Exception e) {
+            // expected
+        }
+
+        var metrics = collectMetrics();
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsFailure = nsAttrs.toBuilder()
+                .put("success", false)
+                .build();
+
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.size", 0, nsAttrs);
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.message.send.duration", 1, nsAttrsFailure);
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.rpc.send.duration", 1, nsAttrsFailure);
+    }
+
+    @Test
+    public void testConsumerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("Hello");
+        }
+
+        Thread.sleep(1000);
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        var metrics = collectMetrics();
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, 
Attributes.empty());
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 
10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", 
"hello".length() * 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.opened", 1, 
nsAttrs);
+
+        Message<String> msg1 = consumer.receive();
+        consumer.acknowledge(msg1);
+
+        Message<String> msg2 = consumer.receive();
+        consumer.negativeAcknowledge(msg2);
+
+        /* Message<String> msg3 = */ consumer.receive();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 
7, nsAttrs);

Review Comment:
   How about `pulsar.client.consumer.receive_queue.count`? 
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = 
OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String 
name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), 
expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String 
name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), 
expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String 
name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, 
Attributes.empty());
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);
+
+
+        assertCounterValue(metrics, "pulsar.client.producer.opened", 1, 
nsAttrs);
+
+        producer.close();
+        client.close();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.producer.closed", 1, 
nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.connections.closed", 1, 
Attributes.empty());
+    }
+
+    @Test
+    public void testConnectionsFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://invalid-pulsar-address:1234")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        try {
+            client.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .create();
+            fail("Should have failed the producer creation");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        var metrics = collectMetrics();
+
+        assertTrue(getCounterValue(metrics, "pulsar.client.connections.failed",

Review Comment:
   Also possible:
   ```
   assertThat(getCounterValue(metrics, "pulsar.client.connections.failed",
       Attributes.builder().put("pulsar.failure.type", "tcp-failed").build()))
       .isGreaterThanOrEqualTo(1L);
   ```
   If it fails, you get to the value that was smaller, and then target (1).



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = 
OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String 
name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), 
expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String 
name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), 
expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String 
name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, 
Attributes.empty());
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);
+
+
+        assertCounterValue(metrics, "pulsar.client.producer.opened", 1, 
nsAttrs);
+
+        producer.close();
+        client.close();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.producer.closed", 1, 
nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.connections.closed", 1, 
Attributes.empty());
+    }
+
+    @Test
+    public void testConnectionsFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://invalid-pulsar-address:1234")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        try {
+            client.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .create();
+            fail("Should have failed the producer creation");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        var metrics = collectMetrics();
+
+        assertTrue(getCounterValue(metrics, "pulsar.client.connections.failed",
+                Attributes.builder().put("pulsar.failure.type", 
"tcp-failed").build()) >= 1);
+    }
+
+    @Test
+    public void testPublishFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(admin.getServiceUrl())
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .sendTimeout(3, TimeUnit.SECONDS)
+                .create();
+
+        // Make the client switch to non-existing broker to make publish fail
+        client.updateServiceUrl("pulsar://invalid-address:6650");
+
+
+        try {
+            producer.send("Hello");
+            fail("Should have failed to publish");
+        } catch (Exception e) {
+            // expected
+        }
+
+        var metrics = collectMetrics();
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsFailure = nsAttrs.toBuilder()
+                .put("success", false)
+                .build();
+
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.size", 0, nsAttrs);
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.message.send.duration", 1, nsAttrsFailure);
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.rpc.send.duration", 1, nsAttrsFailure);
+    }
+
+    @Test
+    public void testConsumerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("Hello");
+        }
+
+        Thread.sleep(1000);
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        var metrics = collectMetrics();
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, 
Attributes.empty());
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 
10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", 
"hello".length() * 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.opened", 1, 
nsAttrs);
+
+        Message<String> msg1 = consumer.receive();
+        consumer.acknowledge(msg1);
+
+        Message<String> msg2 = consumer.receive();
+        consumer.negativeAcknowledge(msg2);
+
+        /* Message<String> msg3 = */ consumer.receive();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 
7, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", 
"hello".length() * 7, nsAttrs);
+
+        // Let msg3 to reach ack-timeout
+        Thread.sleep(3000);

Review Comment:
   Is this immediate or some background threads should kick in marking it as 
ack-timeout. Asking to verify we won't run into flaky stuff. Can be prevented 
with awaitility.



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -554,6 +558,8 @@ ClientBuilder authentication(String authPluginClassName, 
Map<String, String> aut
      */
     ClientBuilder enableBusyWait(boolean enableBusyWait);
 
+    ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry 
openTelemetry);

Review Comment:
   Maybe small javadoc hinting to where I should obtain an OpenTelemetry 
instance? or what is it used for?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = 
ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());
+        consumersClosedCounter = 
ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
+                "Counter of sessions closed", topic, Attributes.empty());
+        messagesReceivedCounter = 
ip.newCounter("pulsar.client.received.count", Unit.Messages,

Review Comment:
   Ok, now I see it. The metric I missed before in the test.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,15 +283,35 @@ public ProducerImpl(PulsarClientImpl client, String 
topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        latencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",

Review Comment:
   Just to be clear: This one measures the latency per single message while the 
RPC one measure the latency of the Send Command RPC method?



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = 
OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String 
name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), 
expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String 
name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), 
expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String 
name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, 
Attributes.empty());
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
+        assertHistoCountValue(metrics, 
"pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
+        assertCounterValue(metrics, 
"pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);
+
+
+        assertCounterValue(metrics, "pulsar.client.producer.opened", 1, 
nsAttrs);
+
+        producer.close();
+        client.close();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.producer.closed", 1, 
nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.connections.closed", 1, 
Attributes.empty());
+    }
+
+    @Test
+    public void testConnectionsFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://invalid-pulsar-address:1234")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        try {

Review Comment:
   If you want, there is also:
   ```
   assertThatThrownBy(() -> {
       client.newProducer(Schema.STRING)
           .topic(topic)
           .create();
   })
   .isInstanceOf(Exception.class);
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = 
OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String 
name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), 
expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String 
name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), 
expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String 
name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());

Review Comment:
   I would remove that.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = 
Attributes.of(AttributeKey.stringKey("pulsar.lookup.transport-type"), "binary");
+
+        histoGetBroker = 
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup.duration",

Review Comment:
   I have to say that it doesn't look good building the same instrument 4 times 
- repeating the description, unit, name. It is bound to confuse.
   Let's at least build the instrument once, then perhaps build the wrapper 
around it which couples the attributes with the instrument. Not a big fan of 
this at - prefer using attributes directly - but it's ok.
   Also - the built in builder is so much readable - you know each string - if 
it's a description, unit, ...
   I'm not afraid future developer will forget anything, since they are forced 
to add their metrics its attrributes to the client docs and there it will 
immediately be visible they forgot something: unit, description, etc.
   
   
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,15 +283,35 @@ public ProducerImpl(PulsarClientImpl client, String 
topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        latencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",
+                "Publish latency experienced by the application, includes 
client batching time", topic,
+                Attributes.empty());
+        rpcLatencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.rpc.send.duration",
+                "Publish RPC latency experienced internally by the client when 
sending data to receiving an ack", topic,
+                Attributes.empty());
+        publishedBytesCounter = 
ip.newCounter("pulsar.client.producer.message.send.size",
+                Unit.Bytes, "The number of bytes published", topic, 
Attributes.empty());
+        pendingMessagesCounter = 
ip.newUpDownCounter("pulsar.client.producer.message.pending.count", 
Unit.Messages,
+                "Pending messages for this producer", topic, 
Attributes.empty());
+        pendingBytesCounter = 
ip.newUpDownCounter("pulsar.client.producer.message.pending.size", Unit.Bytes,
+                "Pending bytes for this producer", topic, Attributes.empty());

Review Comment:
   "The size of the messages in the producer internal queue, waiting to sent"



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -356,15 +396,22 @@ public MessageImpl<?> getNextMessage() {
 
             @Override
             public void sendComplete(Exception e) {
+                long latencyNanos = System.nanoTime() - createdAt;
+                pendingMessagesCounter.decrement();
+                pendingBytesCounter.subtract(msgSize);

Review Comment:
   `pendingBytesCounter` --> `pendingBytesUpDownCounter`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,15 +283,35 @@ public ProducerImpl(PulsarClientImpl client, String 
topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        latencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",
+                "Publish latency experienced by the application, includes 
client batching time", topic,
+                Attributes.empty());
+        rpcLatencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.rpc.send.duration",
+                "Publish RPC latency experienced internally by the client when 
sending data to receiving an ack", topic,
+                Attributes.empty());
+        publishedBytesCounter = 
ip.newCounter("pulsar.client.producer.message.send.size",

Review Comment:
   This is counted after they have been published successfully?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,15 +283,35 @@ public ProducerImpl(PulsarClientImpl client, String 
topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        latencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",
+                "Publish latency experienced by the application, includes 
client batching time", topic,
+                Attributes.empty());
+        rpcLatencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.rpc.send.duration",
+                "Publish RPC latency experienced internally by the client when 
sending data to receiving an ack", topic,
+                Attributes.empty());
+        publishedBytesCounter = 
ip.newCounter("pulsar.client.producer.message.send.size",
+                Unit.Bytes, "The number of bytes published", topic, 
Attributes.empty());
+        pendingMessagesCounter = 
ip.newUpDownCounter("pulsar.client.producer.message.pending.count", 
Unit.Messages,
+                "Pending messages for this producer", topic, 
Attributes.empty());

Review Comment:
   "The number of messages in the producer internal send queue, waiting to be 
sent"



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java:
##########
@@ -60,11 +64,31 @@ public class HttpLookupService implements LookupService {
     private static final String BasePathV1 = "lookup/v2/destination/";
     private static final String BasePathV2 = "lookup/v2/topic/";
 
-    public HttpLookupService(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup)
+    private final LatencyHistogram histoGetBroker;
+    private final LatencyHistogram histoGetTopicMetadata;
+    private final LatencyHistogram histoGetSchema;
+    private final LatencyHistogram histoListTopics;
+
+    public HttpLookupService(InstrumentProvider instrumentProvider, 
ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
             throws PulsarClientException {
         this.httpClient = new HttpClient(conf, eventLoopGroup);
         this.useTls = conf.isUseTls();
         this.listenerName = conf.getListenerName();
+
+        Attributes attrs = 
Attributes.of(AttributeKey.stringKey("pulsar.lookup.transport-type"), "binary");

Review Comment:
   The class name is `HttpLookupService` yet the transport type here is "binary"



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java:
##########
@@ -114,6 +121,11 @@ public UnAckedMessageTracker(PulsarClientImpl client, 
ConsumerBase<?> consumerBa
         ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
         this.readLock = readWriteLock.readLock();
         this.writeLock = readWriteLock.writeLock();
+
+        InstrumentProvider ip = client.instrumentProvider();
+        consumerAckTimeoutsCounter = 
ip.newCounter("pulsar.client.consumer.message.ack.timeout", Unit.Messages,
+                "Number of ack timeouts events", consumerBase.getTopic(), 
Attributes.empty());

Review Comment:
   The description is in "events" and unit is in messages.
   I suggest: "The number of messages that were not acknowledged in the 
configured timeout period, hence, were requested by the client to be 
redelivered"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to