This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit da5329d0adfac8d2fa500cec7f8e99f43f9510aa
Author: Dragos Misca <dragosvic...@users.noreply.github.com>
AuthorDate: Tue Jun 4 12:57:21 2024 -0700

    [improve][broker] Reduce number of OpenTelemetry consumer attributes 
(#22837)
    
    (cherry picked from commit 8276f218f576e81c212cedf8b3691f7c1a654e0e)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 36 +++++++++++++++
 .../broker/stats/OpenTelemetryConsumerStats.java   | 54 ++++++----------------
 .../stats/OpenTelemetryConsumerStatsTest.java      | 34 +-------------
 .../opentelemetry/OpenTelemetryAttributes.java     |  5 --
 4 files changed, 51 insertions(+), 78 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index fe9fbe6a400..1b19a408124 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.util.concurrent.AtomicDouble;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
+import io.opentelemetry.api.common.Attributes;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -35,6 +36,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.stream.Collectors;
 import lombok.Getter;
@@ -69,6 +71,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
 import 
org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import 
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -158,6 +161,10 @@ public class Consumer {
     @Getter
     private final Instant connectedSince = Instant.now();
 
+    private volatile Attributes openTelemetryAttributes;
+    private static final AtomicReferenceFieldUpdater<Consumer, Attributes> 
OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(Consumer.class, 
Attributes.class, "openTelemetryAttributes");
+
     public Consumer(Subscription subscription, SubType subType, String 
topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     boolean isDurable, TransportCnx cnx, String appId,
@@ -230,6 +237,8 @@ public class Consumer {
                 
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
 
         this.schemaType = schemaType;
+
+        OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null);
     }
 
     @VisibleForTesting
@@ -262,6 +271,7 @@ public class Consumer {
         this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
         this.schemaType = null;
         MESSAGE_PERMITS_UPDATER.set(this, availablePermits);
+        OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null);
     }
 
     public SubType subType() {
@@ -1202,4 +1212,30 @@ public class Consumer {
     }
 
     private static final Logger log = LoggerFactory.getLogger(Consumer.class);
+
+    public Attributes getOpenTelemetryAttributes() {
+        if (openTelemetryAttributes != null) {
+            return openTelemetryAttributes;
+        }
+        return OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.updateAndGet(this, 
oldValue -> {
+            if (oldValue != null) {
+                return oldValue;
+            }
+            var topicName = TopicName.get(subscription.getTopic().getName());
+
+            var builder = Attributes.builder()
+                    .put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, 
consumerName)
+                    .put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, 
consumerId)
+                    .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, 
subscription.getName())
+                    .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, 
subType.toString())
+                    .put(OpenTelemetryAttributes.PULSAR_DOMAIN, 
topicName.getDomain().toString())
+                    .put(OpenTelemetryAttributes.PULSAR_TENANT, 
topicName.getTenant())
+                    .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicName.getNamespace())
+                    .put(OpenTelemetryAttributes.PULSAR_TOPIC, 
topicName.getPartitionedTopicName());
+            if (topicName.isPartitioned()) {
+                builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, 
topicName.getPartitionIndex());
+            }
+            return builder.build();
+        });
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java
index 25af3959db3..09b487a8fa2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.stats;
 
-import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.metrics.BatchCallback;
 import io.opentelemetry.api.metrics.ObservableLongMeasurement;
 import java.util.Collection;
@@ -27,8 +26,6 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 
 public class OpenTelemetryConsumerStats implements AutoCloseable {
 
@@ -52,6 +49,9 @@ public class OpenTelemetryConsumerStats implements 
AutoCloseable {
     public static final String MESSAGE_UNACKNOWLEDGED_COUNTER = 
"pulsar.broker.consumer.message.unack.count";
     private final ObservableLongMeasurement messageUnacknowledgedCounter;
 
+    public static final String CONSUMER_BLOCKED_COUNTER = 
"pulsar.broker.consumer.blocked";
+    private final ObservableLongMeasurement consumerBlockedCounter;
+
     // Replaces pulsar_consumer_available_permits
     public static final String MESSAGE_PERMITS_COUNTER = 
"pulsar.broker.consumer.permit.count";
     private final ObservableLongMeasurement messagePermitsCounter;
@@ -91,6 +91,12 @@ public class OpenTelemetryConsumerStats implements 
AutoCloseable {
                 .setDescription("The total number of messages unacknowledged 
by this consumer.")
                 .buildObserver();
 
+        consumerBlockedCounter = meter
+                .upDownCounterBuilder(CONSUMER_BLOCKED_COUNTER)
+                .setUnit("1")
+                .setDescription("Indicates whether the consumer is currently 
blocked due to unacknowledged messages.")
+                .buildObserver();
+
         messagePermitsCounter = meter
                 .upDownCounterBuilder(MESSAGE_PERMITS_COUNTER)
                 .setUnit("{permit}")
@@ -114,6 +120,7 @@ public class OpenTelemetryConsumerStats implements 
AutoCloseable {
                 messageAckCounter,
                 messageRedeliverCounter,
                 messageUnacknowledgedCounter,
+                consumerBlockedCounter,
                 messagePermitsCounter);
     }
 
@@ -123,48 +130,13 @@ public class OpenTelemetryConsumerStats implements 
AutoCloseable {
     }
 
     private void recordMetricsForConsumer(Consumer consumer) {
-        var subscription = consumer.getSubscription();
-        var topicName = TopicName.get(subscription.getTopic().getName());
-
-        var builder = Attributes.builder()
-                .put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, 
consumer.consumerName())
-                .put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, 
consumer.consumerId())
-                .put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE,
-                        consumer.getConnectedSince().getEpochSecond())
-                .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, 
subscription.getName())
-                .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, 
consumer.subType().toString())
-                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, 
topicName.getDomain().toString())
-                .put(OpenTelemetryAttributes.PULSAR_TENANT, 
topicName.getTenant())
-                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicName.getNamespace())
-                .put(OpenTelemetryAttributes.PULSAR_TOPIC, 
topicName.getPartitionedTopicName());
-        if (topicName.isPartitioned()) {
-            builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, 
topicName.getPartitionIndex());
-        }
-        var clientAddress = consumer.getClientAddressAndPort();
-        if (clientAddress != null) {
-            builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, 
clientAddress);
-        }
-        var clientVersion = consumer.getClientVersion();
-        if (clientVersion != null) {
-            builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, 
clientVersion);
-        }
-        var metadataList = consumer.getMetadata()
-                .entrySet()
-                .stream()
-                .map(e -> String.format("%s:%s", e.getKey(), e.getValue()))
-                .toList();
-        builder.put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, 
metadataList);
-        var attributes = builder.build();
-
+        var attributes = consumer.getOpenTelemetryAttributes();
         messageOutCounter.record(consumer.getMsgOutCounter(), attributes);
         bytesOutCounter.record(consumer.getBytesOutCounter(), attributes);
         messageAckCounter.record(consumer.getMessageAckCounter(), attributes);
         messageRedeliverCounter.record(consumer.getMessageRedeliverCounter(), 
attributes);
-        messageUnacknowledgedCounter.record(consumer.getUnackedMessages(),
-                Attributes.builder()
-                        .putAll(attributes)
-                        .put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, 
consumer.isBlocked())
-                        .build());
+        messageUnacknowledgedCounter.record(consumer.getUnackedMessages(), 
attributes);
+        consumerBlockedCounter.record(consumer.isBlocked() ? 1 : 0, 
attributes);
         messagePermitsCounter.record(consumer.getAvailablePermits(), 
attributes);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java
index 5fcc6754b08..a05d7075cf3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java
@@ -20,37 +20,25 @@ package org.apache.pulsar.broker.stats;
 
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.Mockito.doAnswer;
 import io.opentelemetry.api.common.Attributes;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
-import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.awaitility.Awaitility;
-import org.mockito.Mockito;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class OpenTelemetryConsumerStatsTest extends BrokerTestBase {
 
-    private BrokerInterceptor brokerInterceptor;
-
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
-        brokerInterceptor =
-                Mockito.mock(BrokerInterceptor.class, 
Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
         super.baseSetup();
     }
 
@@ -64,7 +52,6 @@ public class OpenTelemetryConsumerStatsTest extends 
BrokerTestBase {
     protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
         super.customizeMainPulsarTestContextBuilder(builder);
         builder.enableOpenTelemetry(true);
-        builder.brokerInterceptor(brokerInterceptor);
     }
 
     @Test(timeOut = 30_000)
@@ -78,14 +65,6 @@ public class OpenTelemetryConsumerStatsTest extends 
BrokerTestBase {
         var subscriptionName = BrokerTestUtil.newUniqueName("test");
         var receiverQueueSize = 100;
 
-        // Intercept calls to create consumer, in order to fetch client 
information.
-        var consumerRef = new AtomicReference<Consumer>();
-        doAnswer(invocation -> {
-            consumerRef.compareAndSet(null, invocation.getArgument(1));
-            return null;
-        }).when(brokerInterceptor)
-          .consumerCreated(any(), argThat(arg -> 
arg.getSubscription().getName().equals(subscriptionName)), any());
-
         @Cleanup
         var consumer = pulsarClient.newConsumer()
                 .topic(topicName)
@@ -94,12 +73,8 @@ public class OpenTelemetryConsumerStatsTest extends 
BrokerTestBase {
                 .subscriptionType(SubscriptionType.Shared)
                 .ackTimeout(1, TimeUnit.SECONDS)
                 .receiverQueueSize(receiverQueueSize)
-                .property("prop1", "value1")
                 .subscribe();
 
-        Awaitility.await().until(() -> consumerRef.get() != null);
-        var serverConsumer = consumerRef.get();
-
         @Cleanup
         var producer = pulsarClient.newProducer()
                 .topic(topicName)
@@ -121,11 +96,6 @@ public class OpenTelemetryConsumerStatsTest extends 
BrokerTestBase {
                 .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, 
SubscriptionType.Shared.toString())
                 .put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, 
consumer.getConsumerName())
                 .put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, 0)
-                .put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE,
-                        serverConsumer.getConnectedSince().getEpochSecond())
-                .put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, 
serverConsumer.getClientAddressAndPort())
-                .put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, 
serverConsumer.getClientVersion())
-                .put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, 
List.of("prop1:value1"))
                 .build();
 
         Awaitility.await().untilAsserted(() -> {
@@ -141,9 +111,9 @@ public class OpenTelemetryConsumerStatsTest extends 
BrokerTestBase {
                     actual -> 
assertThat(actual).isGreaterThanOrEqualTo(receiverQueueSize - messageCount - 
ackCount));
 
             var unAckCount = messageCount - ackCount;
-            assertMetricLongSumValue(metrics, 
OpenTelemetryConsumerStats.MESSAGE_UNACKNOWLEDGED_COUNTER,
-                    
attributes.toBuilder().put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, 
false).build(),
+            assertMetricLongSumValue(metrics, 
OpenTelemetryConsumerStats.MESSAGE_UNACKNOWLEDGED_COUNTER, attributes,
                     unAckCount);
+            assertMetricLongSumValue(metrics, 
OpenTelemetryConsumerStats.CONSUMER_BLOCKED_COUNTER, attributes, 0);
             assertMetricLongSumValue(metrics, 
OpenTelemetryConsumerStats.MESSAGE_REDELIVER_COUNTER, attributes,
                     actual -> 
assertThat(actual).isGreaterThanOrEqualTo(unAckCount));
         });
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 4f898b382e6..a3e8a0c1e72 100644
--- 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -76,11 +76,6 @@ public interface OpenTelemetryAttributes {
      */
     AttributeKey<Long> PULSAR_CONSUMER_ID = 
AttributeKey.longKey("pulsar.consumer.id");
 
-    /**
-     * Indicates whether the consumer is currently blocked on unacknowledged 
messages or not.
-     */
-    AttributeKey<Boolean> PULSAR_CONSUMER_BLOCKED = 
AttributeKey.booleanKey("pulsar.consumer.blocked");
-
     /**
      * The consumer metadata properties, as a list of "key:value" pairs.
      */

Reply via email to