This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit da5329d0adfac8d2fa500cec7f8e99f43f9510aa Author: Dragos Misca <dragosvic...@users.noreply.github.com> AuthorDate: Tue Jun 4 12:57:21 2024 -0700 [improve][broker] Reduce number of OpenTelemetry consumer attributes (#22837) (cherry picked from commit 8276f218f576e81c212cedf8b3691f7c1a654e0e) --- .../org/apache/pulsar/broker/service/Consumer.java | 36 +++++++++++++++ .../broker/stats/OpenTelemetryConsumerStats.java | 54 ++++++---------------- .../stats/OpenTelemetryConsumerStatsTest.java | 34 +------------- .../opentelemetry/OpenTelemetryAttributes.java | 5 -- 4 files changed, 51 insertions(+), 78 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index fe9fbe6a400..1b19a408124 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects; import com.google.common.util.concurrent.AtomicDouble; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; +import io.opentelemetry.api.common.Attributes; import java.time.Instant; import java.util.ArrayList; import java.util.BitSet; @@ -35,6 +36,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; import lombok.Getter; @@ -69,6 +71,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.transaction.common.exception.TransactionConflictException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,6 +161,10 @@ public class Consumer { @Getter private final Instant connectedSince = Instant.now(); + private volatile Attributes openTelemetryAttributes; + private static final AtomicReferenceFieldUpdater<Consumer, Attributes> OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(Consumer.class, Attributes.class, "openTelemetryAttributes"); + public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, boolean isDurable, TransportCnx cnx, String appId, @@ -230,6 +237,8 @@ public class Consumer { .getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled(); this.schemaType = schemaType; + + OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null); } @VisibleForTesting @@ -262,6 +271,7 @@ public class Consumer { this.isAcknowledgmentAtBatchIndexLevelEnabled = false; this.schemaType = null; MESSAGE_PERMITS_UPDATER.set(this, availablePermits); + OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null); } public SubType subType() { @@ -1202,4 +1212,30 @@ public class Consumer { } private static final Logger log = LoggerFactory.getLogger(Consumer.class); + + public Attributes getOpenTelemetryAttributes() { + if (openTelemetryAttributes != null) { + return openTelemetryAttributes; + } + return OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.updateAndGet(this, oldValue -> { + if (oldValue != null) { + return oldValue; + } + var topicName = TopicName.get(subscription.getTopic().getName()); + + var builder = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, consumerName) + .put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, consumerId) + .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subscription.getName()) + .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, subType.toString()) + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString()) + .put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant()) + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace()) + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName()); + if (topicName.isPartitioned()) { + builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex()); + } + return builder.build(); + }); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java index 25af3959db3..09b487a8fa2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.stats; -import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.BatchCallback; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import java.util.Collection; @@ -27,8 +26,6 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; public class OpenTelemetryConsumerStats implements AutoCloseable { @@ -52,6 +49,9 @@ public class OpenTelemetryConsumerStats implements AutoCloseable { public static final String MESSAGE_UNACKNOWLEDGED_COUNTER = "pulsar.broker.consumer.message.unack.count"; private final ObservableLongMeasurement messageUnacknowledgedCounter; + public static final String CONSUMER_BLOCKED_COUNTER = "pulsar.broker.consumer.blocked"; + private final ObservableLongMeasurement consumerBlockedCounter; + // Replaces pulsar_consumer_available_permits public static final String MESSAGE_PERMITS_COUNTER = "pulsar.broker.consumer.permit.count"; private final ObservableLongMeasurement messagePermitsCounter; @@ -91,6 +91,12 @@ public class OpenTelemetryConsumerStats implements AutoCloseable { .setDescription("The total number of messages unacknowledged by this consumer.") .buildObserver(); + consumerBlockedCounter = meter + .upDownCounterBuilder(CONSUMER_BLOCKED_COUNTER) + .setUnit("1") + .setDescription("Indicates whether the consumer is currently blocked due to unacknowledged messages.") + .buildObserver(); + messagePermitsCounter = meter .upDownCounterBuilder(MESSAGE_PERMITS_COUNTER) .setUnit("{permit}") @@ -114,6 +120,7 @@ public class OpenTelemetryConsumerStats implements AutoCloseable { messageAckCounter, messageRedeliverCounter, messageUnacknowledgedCounter, + consumerBlockedCounter, messagePermitsCounter); } @@ -123,48 +130,13 @@ public class OpenTelemetryConsumerStats implements AutoCloseable { } private void recordMetricsForConsumer(Consumer consumer) { - var subscription = consumer.getSubscription(); - var topicName = TopicName.get(subscription.getTopic().getName()); - - var builder = Attributes.builder() - .put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, consumer.consumerName()) - .put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, consumer.consumerId()) - .put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE, - consumer.getConnectedSince().getEpochSecond()) - .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subscription.getName()) - .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, consumer.subType().toString()) - .put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString()) - .put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant()) - .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace()) - .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName()); - if (topicName.isPartitioned()) { - builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex()); - } - var clientAddress = consumer.getClientAddressAndPort(); - if (clientAddress != null) { - builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, clientAddress); - } - var clientVersion = consumer.getClientVersion(); - if (clientVersion != null) { - builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, clientVersion); - } - var metadataList = consumer.getMetadata() - .entrySet() - .stream() - .map(e -> String.format("%s:%s", e.getKey(), e.getValue())) - .toList(); - builder.put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, metadataList); - var attributes = builder.build(); - + var attributes = consumer.getOpenTelemetryAttributes(); messageOutCounter.record(consumer.getMsgOutCounter(), attributes); bytesOutCounter.record(consumer.getBytesOutCounter(), attributes); messageAckCounter.record(consumer.getMessageAckCounter(), attributes); messageRedeliverCounter.record(consumer.getMessageRedeliverCounter(), attributes); - messageUnacknowledgedCounter.record(consumer.getUnackedMessages(), - Attributes.builder() - .putAll(attributes) - .put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, consumer.isBlocked()) - .build()); + messageUnacknowledgedCounter.record(consumer.getUnackedMessages(), attributes); + consumerBlockedCounter.record(consumer.isBlocked() ? 1 : 0, attributes); messagePermitsCounter.record(consumer.getAvailablePermits(), attributes); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java index 5fcc6754b08..a05d7075cf3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java @@ -20,37 +20,25 @@ package org.apache.pulsar.broker.stats; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.doAnswer; import io.opentelemetry.api.common.Attributes; -import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; -import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.awaitility.Awaitility; -import org.mockito.Mockito; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class OpenTelemetryConsumerStatsTest extends BrokerTestBase { - private BrokerInterceptor brokerInterceptor; - @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { - brokerInterceptor = - Mockito.mock(BrokerInterceptor.class, Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS)); super.baseSetup(); } @@ -64,7 +52,6 @@ public class OpenTelemetryConsumerStatsTest extends BrokerTestBase { protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { super.customizeMainPulsarTestContextBuilder(builder); builder.enableOpenTelemetry(true); - builder.brokerInterceptor(brokerInterceptor); } @Test(timeOut = 30_000) @@ -78,14 +65,6 @@ public class OpenTelemetryConsumerStatsTest extends BrokerTestBase { var subscriptionName = BrokerTestUtil.newUniqueName("test"); var receiverQueueSize = 100; - // Intercept calls to create consumer, in order to fetch client information. - var consumerRef = new AtomicReference<Consumer>(); - doAnswer(invocation -> { - consumerRef.compareAndSet(null, invocation.getArgument(1)); - return null; - }).when(brokerInterceptor) - .consumerCreated(any(), argThat(arg -> arg.getSubscription().getName().equals(subscriptionName)), any()); - @Cleanup var consumer = pulsarClient.newConsumer() .topic(topicName) @@ -94,12 +73,8 @@ public class OpenTelemetryConsumerStatsTest extends BrokerTestBase { .subscriptionType(SubscriptionType.Shared) .ackTimeout(1, TimeUnit.SECONDS) .receiverQueueSize(receiverQueueSize) - .property("prop1", "value1") .subscribe(); - Awaitility.await().until(() -> consumerRef.get() != null); - var serverConsumer = consumerRef.get(); - @Cleanup var producer = pulsarClient.newProducer() .topic(topicName) @@ -121,11 +96,6 @@ public class OpenTelemetryConsumerStatsTest extends BrokerTestBase { .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, SubscriptionType.Shared.toString()) .put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, consumer.getConsumerName()) .put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, 0) - .put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE, - serverConsumer.getConnectedSince().getEpochSecond()) - .put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, serverConsumer.getClientAddressAndPort()) - .put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, serverConsumer.getClientVersion()) - .put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, List.of("prop1:value1")) .build(); Awaitility.await().untilAsserted(() -> { @@ -141,9 +111,9 @@ public class OpenTelemetryConsumerStatsTest extends BrokerTestBase { actual -> assertThat(actual).isGreaterThanOrEqualTo(receiverQueueSize - messageCount - ackCount)); var unAckCount = messageCount - ackCount; - assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.MESSAGE_UNACKNOWLEDGED_COUNTER, - attributes.toBuilder().put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, false).build(), + assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.MESSAGE_UNACKNOWLEDGED_COUNTER, attributes, unAckCount); + assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.CONSUMER_BLOCKED_COUNTER, attributes, 0); assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.MESSAGE_REDELIVER_COUNTER, attributes, actual -> assertThat(actual).isGreaterThanOrEqualTo(unAckCount)); }); diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index 4f898b382e6..a3e8a0c1e72 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -76,11 +76,6 @@ public interface OpenTelemetryAttributes { */ AttributeKey<Long> PULSAR_CONSUMER_ID = AttributeKey.longKey("pulsar.consumer.id"); - /** - * Indicates whether the consumer is currently blocked on unacknowledged messages or not. - */ - AttributeKey<Boolean> PULSAR_CONSUMER_BLOCKED = AttributeKey.booleanKey("pulsar.consumer.blocked"); - /** * The consumer metadata properties, as a list of "key:value" pairs. */