This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e558cfe9836 [feat][broker] PIP-264: Add OpenTelemetry consumer metrics (#22693) e558cfe9836 is described below commit e558cfe9836256065befb3ff6d6043eca10aa5ef Author: Dragos Misca <dragosvic...@users.noreply.github.com> AuthorDate: Fri May 10 15:35:03 2024 -0700 [feat][broker] PIP-264: Add OpenTelemetry consumer metrics (#22693) --- .../org/apache/pulsar/broker/PulsarService.java | 8 + .../org/apache/pulsar/broker/service/Consumer.java | 32 +++- .../broker/stats/OpenTelemetryConsumerStats.java | 170 +++++++++++++++++++++ .../stats/OpenTelemetryConsumerStatsTest.java | 151 ++++++++++++++++++ .../broker/testcontext/PulsarTestContext.java | 1 + .../pulsar/client/api/BrokerServiceLookupTest.java | 1 + .../opentelemetry/OpenTelemetryAttributes.java | 46 ++++++ 7 files changed, 408 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index ac37aca531a..6ee35ad295f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -109,6 +109,7 @@ import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.SchemaStorageFactory; import org.apache.pulsar.broker.stats.MetricsGenerator; +import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats; import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; @@ -254,6 +255,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private MetricsGenerator metricsGenerator; private final PulsarBrokerOpenTelemetry openTelemetry; private OpenTelemetryTopicStats openTelemetryTopicStats; + private OpenTelemetryConsumerStats openTelemetryConsumerStats; private TransactionMetadataStoreService transactionMetadataStoreService; private TransactionBufferProvider transactionBufferProvider; @@ -630,8 +632,13 @@ public class PulsarService implements AutoCloseable, ShutdownService { brokerClientSharedTimer.stop(); monotonicSnapshotClock.close(); + if (openTelemetryConsumerStats != null) { + openTelemetryConsumerStats.close(); + openTelemetryConsumerStats = null; + } if (openTelemetryTopicStats != null) { openTelemetryTopicStats.close(); + openTelemetryTopicStats = null; } asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup)); @@ -775,6 +782,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { } openTelemetryTopicStats = new OpenTelemetryTopicStats(this); + openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this); localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 89a9bab497d..fe9fbe6a400 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects; import com.google.common.util.concurrent.AtomicDouble; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; +import java.time.Instant; import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; @@ -90,7 +91,9 @@ public class Consumer { private final Rate msgOut; private final Rate msgRedeliver; private final LongAdder msgOutCounter; + private final LongAdder msgRedeliverCounter; private final LongAdder bytesOutCounter; + private final LongAdder messageAckCounter; private final Rate messageAckRate; private volatile long lastConsumedTimestamp; @@ -152,6 +155,9 @@ public class Consumer { @Getter private final SchemaType schemaType; + @Getter + private final Instant connectedSince = Instant.now(); + public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, boolean isDurable, TransportCnx cnx, String appId, @@ -182,8 +188,10 @@ public class Consumer { this.msgOut = new Rate(); this.chunkedMessageRate = new Rate(); this.msgRedeliver = new Rate(); + this.msgRedeliverCounter = new LongAdder(); this.bytesOutCounter = new LongAdder(); this.msgOutCounter = new LongAdder(); + this.messageAckCounter = new LongAdder(); this.messageAckRate = new Rate(); this.appId = appId; @@ -200,7 +208,7 @@ public class Consumer { stats = new ConsumerStatsImpl(); stats.setAddress(cnx.clientSourceAddressAndPort()); stats.consumerName = consumerName; - stats.setConnectedSince(DateFormatter.now()); + stats.setConnectedSince(DateFormatter.format(connectedSince)); stats.setClientVersion(cnx.getClientVersion()); stats.metadata = this.metadata; @@ -238,8 +246,10 @@ public class Consumer { this.consumerName = consumerName; this.msgOut = null; this.msgRedeliver = null; + this.msgRedeliverCounter = null; this.msgOutCounter = null; this.bytesOutCounter = null; + this.messageAckCounter = null; this.messageAckRate = null; this.pendingAcks = null; this.stats = null; @@ -502,6 +512,7 @@ public class Consumer { return future .thenApply(v -> { this.messageAckRate.recordEvent(v); + this.messageAckCounter.add(v); return null; }); } @@ -922,6 +933,14 @@ public class Consumer { return bytesOutCounter.longValue(); } + public long getMessageAckCounter() { + return messageAckCounter.sum(); + } + + public long getMessageRedeliverCounter() { + return msgRedeliverCounter.sum(); + } + public int getUnackedMessages() { return unackedMessages; } @@ -1059,6 +1078,8 @@ public class Consumer { } msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), totalRedeliveryMessages.intValue()); + msgRedeliverCounter.add(totalRedeliveryMessages.intValue()); + subscription.redeliverUnacknowledgedMessages(this, pendingPositions); } else { subscription.redeliverUnacknowledgedMessages(this, consumerEpoch); @@ -1091,6 +1112,7 @@ public class Consumer { subscription.redeliverUnacknowledgedMessages(this, pendingPositions); msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages); + msgRedeliverCounter.add(totalRedeliveryMessages); int numberOfBlockedPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0); @@ -1153,6 +1175,14 @@ public class Consumer { return clientAddress; } + public String getClientAddressAndPort() { + return cnx.clientSourceAddressAndPort(); + } + + public String getClientVersion() { + return cnx.getClientVersion(); + } + public MessageId getStartMessageId() { return startMessageId; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java new file mode 100644 index 00000000000..25af3959db3 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.BatchCallback; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.Collection; +import java.util.Optional; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; + +public class OpenTelemetryConsumerStats implements AutoCloseable { + + // Replaces pulsar_consumer_msg_rate_out + public static final String MESSAGE_OUT_COUNTER = "pulsar.broker.consumer.message.outgoing.count"; + private final ObservableLongMeasurement messageOutCounter; + + // Replaces pulsar_consumer_msg_throughput_out + public static final String BYTES_OUT_COUNTER = "pulsar.broker.consumer.message.outgoing.size"; + private final ObservableLongMeasurement bytesOutCounter; + + // Replaces pulsar_consumer_msg_ack_rate + public static final String MESSAGE_ACK_COUNTER = "pulsar.broker.consumer.message.ack.count"; + private final ObservableLongMeasurement messageAckCounter; + + // Replaces pulsar_consumer_msg_rate_redeliver + public static final String MESSAGE_REDELIVER_COUNTER = "pulsar.broker.consumer.message.redeliver.count"; + private final ObservableLongMeasurement messageRedeliverCounter; + + // Replaces pulsar_consumer_unacked_messages + public static final String MESSAGE_UNACKNOWLEDGED_COUNTER = "pulsar.broker.consumer.message.unack.count"; + private final ObservableLongMeasurement messageUnacknowledgedCounter; + + // Replaces pulsar_consumer_available_permits + public static final String MESSAGE_PERMITS_COUNTER = "pulsar.broker.consumer.permit.count"; + private final ObservableLongMeasurement messagePermitsCounter; + + private final BatchCallback batchCallback; + + public OpenTelemetryConsumerStats(PulsarService pulsar) { + var meter = pulsar.getOpenTelemetry().getMeter(); + + messageOutCounter = meter + .counterBuilder(MESSAGE_OUT_COUNTER) + .setUnit("{message}") + .setDescription("The total number of messages dispatched to this consumer.") + .buildObserver(); + + bytesOutCounter = meter + .counterBuilder(BYTES_OUT_COUNTER) + .setUnit("By") + .setDescription("The total number of messages bytes dispatched to this consumer.") + .buildObserver(); + + messageAckCounter = meter + .counterBuilder(MESSAGE_ACK_COUNTER) + .setUnit("{ack}") + .setDescription("The total number of message acknowledgments received from this consumer.") + .buildObserver(); + + messageRedeliverCounter = meter + .counterBuilder(MESSAGE_REDELIVER_COUNTER) + .setUnit("{message}") + .setDescription("The total number of messages that have been redelivered to this consumer.") + .buildObserver(); + + messageUnacknowledgedCounter = meter + .upDownCounterBuilder(MESSAGE_UNACKNOWLEDGED_COUNTER) + .setUnit("{message}") + .setDescription("The total number of messages unacknowledged by this consumer.") + .buildObserver(); + + messagePermitsCounter = meter + .upDownCounterBuilder(MESSAGE_PERMITS_COUNTER) + .setUnit("{permit}") + .setDescription("The number of permits currently available for this consumer.") + .buildObserver(); + + batchCallback = meter.batchCallback(() -> pulsar.getBrokerService() + .getTopics() + .values() + .stream() + .map(topicFuture -> topicFuture.getNow(Optional.empty())) + .filter(Optional::isPresent) + .map(Optional::get) + .map(Topic::getSubscriptions) + .flatMap(s -> s.values().stream()) + .map(Subscription::getConsumers) + .flatMap(Collection::stream) + .forEach(this::recordMetricsForConsumer), + messageOutCounter, + bytesOutCounter, + messageAckCounter, + messageRedeliverCounter, + messageUnacknowledgedCounter, + messagePermitsCounter); + } + + @Override + public void close() { + batchCallback.close(); + } + + private void recordMetricsForConsumer(Consumer consumer) { + var subscription = consumer.getSubscription(); + var topicName = TopicName.get(subscription.getTopic().getName()); + + var builder = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, consumer.consumerName()) + .put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, consumer.consumerId()) + .put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE, + consumer.getConnectedSince().getEpochSecond()) + .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subscription.getName()) + .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, consumer.subType().toString()) + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString()) + .put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant()) + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace()) + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName()); + if (topicName.isPartitioned()) { + builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex()); + } + var clientAddress = consumer.getClientAddressAndPort(); + if (clientAddress != null) { + builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, clientAddress); + } + var clientVersion = consumer.getClientVersion(); + if (clientVersion != null) { + builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, clientVersion); + } + var metadataList = consumer.getMetadata() + .entrySet() + .stream() + .map(e -> String.format("%s:%s", e.getKey(), e.getValue())) + .toList(); + builder.put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, metadataList); + var attributes = builder.build(); + + messageOutCounter.record(consumer.getMsgOutCounter(), attributes); + bytesOutCounter.record(consumer.getBytesOutCounter(), attributes); + messageAckCounter.record(consumer.getMessageAckCounter(), attributes); + messageRedeliverCounter.record(consumer.getMessageRedeliverCounter(), attributes); + messageUnacknowledgedCounter.record(consumer.getUnackedMessages(), + Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, consumer.isBlocked()) + .build()); + messagePermitsCounter.record(consumer.getAvailablePermits(), attributes); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java new file mode 100644 index 00000000000..5fcc6754b08 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doAnswer; +import io.opentelemetry.api.common.Attributes; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; +import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryConsumerStatsTest extends BrokerTestBase { + + private BrokerInterceptor brokerInterceptor; + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + brokerInterceptor = + Mockito.mock(BrokerInterceptor.class, Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS)); + super.baseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { + super.customizeMainPulsarTestContextBuilder(builder); + builder.enableOpenTelemetry(true); + builder.brokerInterceptor(brokerInterceptor); + } + + @Test(timeOut = 30_000) + public void testMessagingMetrics() throws Exception { + var topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testConsumerMessagingMetrics"); + admin.topics().createNonPartitionedTopic(topicName); + + var messageCount = 5; + var ackCount = 3; + + var subscriptionName = BrokerTestUtil.newUniqueName("test"); + var receiverQueueSize = 100; + + // Intercept calls to create consumer, in order to fetch client information. + var consumerRef = new AtomicReference<Consumer>(); + doAnswer(invocation -> { + consumerRef.compareAndSet(null, invocation.getArgument(1)); + return null; + }).when(brokerInterceptor) + .consumerCreated(any(), argThat(arg -> arg.getSubscription().getName().equals(subscriptionName)), any()); + + @Cleanup + var consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subscriptionName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .receiverQueueSize(receiverQueueSize) + .property("prop1", "value1") + .subscribe(); + + Awaitility.await().until(() -> consumerRef.get() != null); + var serverConsumer = consumerRef.get(); + + @Cleanup + var producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + for (int i = 0; i < messageCount; i++) { + producer.send(String.format("msg-%d", i).getBytes()); + var message = consumer.receive(); + if (i < ackCount) { + consumer.acknowledge(message); + } + } + + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent") + .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName) + .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subscriptionName) + .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, SubscriptionType.Shared.toString()) + .put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, consumer.getConsumerName()) + .put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, 0) + .put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE, + serverConsumer.getConnectedSince().getEpochSecond()) + .put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, serverConsumer.getClientAddressAndPort()) + .put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, serverConsumer.getClientVersion()) + .put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, List.of("prop1:value1")) + .build(); + + Awaitility.await().untilAsserted(() -> { + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + + assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.MESSAGE_OUT_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.BYTES_OUT_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + + assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.MESSAGE_ACK_COUNTER, attributes, ackCount); + assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.MESSAGE_PERMITS_COUNTER, attributes, + actual -> assertThat(actual).isGreaterThanOrEqualTo(receiverQueueSize - messageCount - ackCount)); + + var unAckCount = messageCount - ackCount; + assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.MESSAGE_UNACKNOWLEDGED_COUNTER, + attributes.toBuilder().put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, false).build(), + unAckCount); + assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.MESSAGE_REDELIVER_COUNTER, attributes, + actual -> assertThat(actual).isGreaterThanOrEqualTo(unAckCount)); + }); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index dceb18cbeaa..09cd4f7cb1a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -746,6 +746,7 @@ public class PulsarTestContext implements AutoCloseable { if (builder.enableOpenTelemetry) { var reader = InMemoryMetricReader.create(); openTelemetryMetricReader(reader); + registerCloseable(reader); openTelemetrySdkBuilderCustomizer = BrokerOpenTelemetryTestUtil.getOpenTelemetrySdkBuilderConsumer(reader); } else { openTelemetrySdkBuilderCustomizer = null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 2d2019b38ed..0ad0b01dc1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -191,6 +191,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { // Disable collecting topic stats during this test, as it deadlocks on access to map BrokerService.topics. pulsar2.getOpenTelemetryTopicStats().close(); + pulsar2.getOpenTelemetryConsumerStats().close(); var metricReader = pulsarTestContext.getOpenTelemetryMetricReader(); var lookupRequestSemaphoreField = BrokerService.class.getDeclaredField("lookupRequestSemaphore"); diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index 6088f52f72c..4f898b382e6 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -19,6 +19,7 @@ package org.apache.pulsar.opentelemetry; import io.opentelemetry.api.common.AttributeKey; +import java.util.List; /** * Common OpenTelemetry attributes to be used by Pulsar components. @@ -55,6 +56,51 @@ public interface OpenTelemetryAttributes { */ AttributeKey<Long> PULSAR_PARTITION_INDEX = AttributeKey.longKey("pulsar.partition.index"); + /** + * The name of the Pulsar subscription. + */ + AttributeKey<String> PULSAR_SUBSCRIPTION_NAME = AttributeKey.stringKey("pulsar.subscription.name"); + + /** + * The type of the Pulsar subscription. + */ + AttributeKey<String> PULSAR_SUBSCRIPTION_TYPE = AttributeKey.stringKey("pulsar.subscription.type"); + + /** + * The name of the Pulsar consumer. + */ + AttributeKey<String> PULSAR_CONSUMER_NAME = AttributeKey.stringKey("pulsar.consumer.name"); + + /** + * The ID of the Pulsar consumer. + */ + AttributeKey<Long> PULSAR_CONSUMER_ID = AttributeKey.longKey("pulsar.consumer.id"); + + /** + * Indicates whether the consumer is currently blocked on unacknowledged messages or not. + */ + AttributeKey<Boolean> PULSAR_CONSUMER_BLOCKED = AttributeKey.booleanKey("pulsar.consumer.blocked"); + + /** + * The consumer metadata properties, as a list of "key:value" pairs. + */ + AttributeKey<List<String>> PULSAR_CONSUMER_METADATA = AttributeKey.stringArrayKey("pulsar.consumer.metadata"); + + /** + * The UTC timestamp of the Pulsar consumer creation. + */ + AttributeKey<Long> PULSAR_CONSUMER_CONNECTED_SINCE = AttributeKey.longKey("pulsar.consumer.connected_since"); + + /** + * The address of the Pulsar client. + */ + AttributeKey<String> PULSAR_CLIENT_ADDRESS = AttributeKey.stringKey("pulsar.client.address"); + + /** + * The version of the Pulsar client. + */ + AttributeKey<String> PULSAR_CLIENT_VERSION = AttributeKey.stringKey("pulsar.client.version"); + /** * The status of the Pulsar transaction. */