This is an automated email from the ASF dual-hosted git repository.
congbobo184 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8ffeddae6c5 [fix][client] Fix OpenTelemetryProducerInterceptor not
executing due to eligible check (#25585)
8ffeddae6c5 is described below
commit 8ffeddae6c5cb3bd2313d2e709450941f8bec634
Author: hrzzzz <[email protected]>
AuthorDate: Mon May 11 19:20:20 2026 +0800
[fix][client] Fix OpenTelemetryProducerInterceptor not executing due to
eligible check (#25585)
Fixes https://github.com/apache/pulsar/issues/25584
### Motivation
OpenTelemetryProducerInterceptor had a deadlock:
1. eligible() returns tracer != null && propagator != null — both null
after construction.
2. ProducerInterceptors.beforeSend() checks eligible() first, and skips the
interceptor entirely if it returns false.
3. Tracer and propagator are initialized in beforeSend() via
initializeIfNeeded(producer), which is never reached.
The interceptor was effectively a no-op — it never created any spans.
### Modifications
Remove the lazy-initialization pattern and inject InstrumentProvider at
construction time instead. The InstrumentProvider is already available in both
ProducerBuilderImpl and ConsumerBuilderImpl where the interceptors are created:
- OpenTelemetryProducerInterceptor — accepts InstrumentProvider in
constructor, initializes tracer and propagator eagerly. Removes
initializeIfNeeded() and the initialized flag.
- OpenTelemetryConsumerInterceptor — same treatment for consistency.
- ProducerBuilderImpl / ConsumerBuilderImpl — pass
client.instrumentProvider() to the constructor.
---
.../OpenTelemetryTracingIntegrationTest.java | 48 +++++++--------
.../pulsar/client/impl/ConsumerBuilderImpl.java | 3 +-
.../pulsar/client/impl/ProducerBuilderImpl.java | 3 +-
.../tracing/OpenTelemetryConsumerInterceptor.java | 72 +++++-----------------
.../tracing/OpenTelemetryProducerInterceptor.java | 30 ++-------
5 files changed, 50 insertions(+), 106 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
index fcc0bd1776e..1007f774788 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
@@ -128,33 +128,30 @@ public class OpenTelemetryTracingIntegrationTest extends
BrokerTestBase {
// Force flush tracer provider
flushSpans();
- // Verify spans - at least one span should be created
+ // Verify spans - expected 2 spans to be created
List<SpanData> spans = spanExporter.getFinishedSpanItems();
- assertTrue(spans.size() > 0, "Expected at least one span, got: " +
spans.size());
+ assertEquals(spans.size(), 2, "Expected 2 spans, got: " +
spans.size());
- // Verify producer span if present
- spans.stream()
+ // Verify producer span
+ SpanData producerSpan = spans.stream()
.filter(s -> s.getKind() == SpanKind.PRODUCER)
.findFirst()
- .ifPresent(producerSpan -> {
- assertEquals(producerSpan.getName(), "send " + topic);
- assertEquals(producerSpan.getAttributes().get(
-
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")),
"pulsar");
- });
+ .orElseThrow();
+ assertEquals(producerSpan.getName(), "send " + topic);
+ assertEquals(producerSpan.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")),
"pulsar");
- // Verify consumer span if present
- spans.stream()
+ // Verify consumer span
+ SpanData consumerSpan = spans.stream()
.filter(s -> s.getKind() == SpanKind.CONSUMER)
.findFirst()
- .ifPresent(consumerSpan -> {
- assertEquals(consumerSpan.getName(), "process " + topic);
- assertEquals(consumerSpan.getAttributes().get(
-
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")),
"pulsar");
- assertEquals(consumerSpan.getAttributes().get(
- io.opentelemetry.api.common.AttributeKey.stringKey(
- "messaging.pulsar.acknowledgment.type")),
- "acknowledge");
- });
+ .orElseThrow();
+ assertEquals(consumerSpan.getName(), "process " + topic);
+ assertEquals(consumerSpan.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")),
"pulsar");
+ assertEquals(consumerSpan.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.pulsar.acknowledgment.type")),
+ "acknowledge");
}
@Test
@@ -720,14 +717,16 @@ public class OpenTelemetryTracingIntegrationTest extends
BrokerTestBase {
.subscriptionName("test-sub")
.subscribe();
+ int numMessages = 5;
+
// Send batch of messages
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < numMessages; i++) {
producer.sendAsync("message-" + i);
}
producer.flush();
// Receive and acknowledge all messages
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < numMessages; i++) {
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
consumer.acknowledge(msg);
@@ -741,9 +740,10 @@ public class OpenTelemetryTracingIntegrationTest extends
BrokerTestBase {
flushSpans();
// Verify spans for batched messages
- // Note: Tracing behavior may vary for batched messages depending on
when spans are created
List<SpanData> spans = spanExporter.getFinishedSpanItems();
- assertTrue(spans.size() > 0, "Expected at least some spans for batched
messages");
+ int expectedNumSpans = numMessages * 2;
+ assertEquals(spans.size(), expectedNumSpans,
+ "Expected " + expectedNumSpans + " spans for batched messages,
got " + spans.size());
// Verify that spans have correct attributes
spans.stream()
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index a4b9a52a7fb..296e854c1d5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -233,7 +233,8 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
effectiveInterceptors = new
java.util.ArrayList<>(effectiveInterceptors);
}
effectiveInterceptors.add(
- new
org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor<>());
+ new
org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor<>(
+ client.instrumentProvider()));
}
if (effectiveInterceptors == null || effectiveInterceptors.size()
== 0) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 9b9c2c07e49..9242cfd6a08 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -116,7 +116,8 @@ public class ProducerBuilderImpl<T> implements
ProducerBuilder<T> {
} else {
effectiveInterceptors = new ArrayList<>(effectiveInterceptors);
}
- effectiveInterceptors.add(new
org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor());
+ effectiveInterceptors.add(new
org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor(
+ client.instrumentProvider()));
}
return effectiveInterceptors == null || effectiveInterceptors.size()
== 0
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
index 145ef5fa8c9..79d13cd2249 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
@@ -33,11 +33,8 @@ import org.apache.pulsar.client.api.ConsumerInterceptor;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
-import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.TraceableMessageId;
-import org.apache.pulsar.client.impl.ConsumerBase;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
/**
@@ -64,11 +61,10 @@ import
org.apache.pulsar.client.impl.metrics.InstrumentProvider;
@CustomLog
public class OpenTelemetryConsumerInterceptor<T> implements
ConsumerInterceptor<T> {
- private Tracer tracer;
- private TextMapPropagator propagator;
+ private final Tracer tracer;
+ private final TextMapPropagator propagator;
private String topic;
private String subscription;
- private boolean initialized = false;
/**
* Used for cumulative acknowledgment support (Failover/Exclusive
subscriptions).
@@ -80,10 +76,12 @@ public class OpenTelemetryConsumerInterceptor<T> implements
ConsumerInterceptor<
* instance handles messages from multiple topic partitions. Cumulative
ack only affects
* messages from the same topic partition.
*/
- private volatile Map<String, ConcurrentSkipListMap<MessageIdAdv, Span>>
messageSpansByTopic;
+ private final Map<String, ConcurrentSkipListMap<MessageIdAdv, Span>>
messageSpansByTopic;
- public OpenTelemetryConsumerInterceptor() {
- // Tracer and propagator will be initialized in beforeConsume when we
have access to the consumer
+ public OpenTelemetryConsumerInterceptor(InstrumentProvider
instrumentProvider) {
+ this.tracer = instrumentProvider.getTracer();
+ this.propagator =
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
+ this.messageSpansByTopic = new ConcurrentHashMap<>();
}
/**
@@ -97,52 +95,17 @@ public class OpenTelemetryConsumerInterceptor<T> implements
ConsumerInterceptor<
return topic != null ? topic : "";
}
- /**
- * Initialize the tracer from the consumer's client.
- * This is called lazily on the first message.
- */
- private void initializeIfNeeded(Consumer<T> consumer) {
- if (!initialized && consumer instanceof ConsumerBase<?> consumerBase) {
- PulsarClientImpl client = consumerBase.getClient();
- InstrumentProvider instrumentProvider =
client.instrumentProvider();
-
- this.tracer = instrumentProvider.getTracer();
- this.propagator =
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
- this.initialized = true;
- if (consumerBase.getConf().getSubscriptionType() ==
SubscriptionType.Exclusive
- || consumerBase.getConf().getSubscriptionType() ==
SubscriptionType.Failover) {
- ensureMapInitialized();
- }
- }
- }
-
- /**
- * Ensure the map is initialized for cumulative acknowledgment support.
- * This is called when we detect cumulative ack is being used.
- */
- private void ensureMapInitialized() {
- if (messageSpansByTopic == null) {
- messageSpansByTopic = new ConcurrentHashMap<>();
- log.debug("Initialized message spans map for cumulative
acknowledgment support");
- }
- }
-
@Override
public void close() {
// Clean up any remaining spans for Failover/Exclusive subscriptions
- if (messageSpansByTopic != null) {
- messageSpansByTopic.values().forEach(topicSpans ->
- topicSpans.values().forEach(TracingContext::endSpan)
- );
- messageSpansByTopic.clear();
- }
+ messageSpansByTopic.values().forEach(topicSpans ->
+ topicSpans.values().forEach(TracingContext::endSpan)
+ );
+ messageSpansByTopic.clear();
}
@Override
public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
- // Initialize tracer from consumer on first call
- initializeIfNeeded(consumer);
-
if (tracer == null || propagator == null) {
return message;
}
@@ -162,7 +125,7 @@ public class OpenTelemetryConsumerInterceptor<T> implements
ConsumerInterceptor<
MessageId messageId = message.getMessageId();
// Store in map for cumulative ack support (Failover/Exclusive)
- if (messageSpansByTopic != null && messageId instanceof
MessageIdAdv) {
+ if (messageId instanceof MessageIdAdv) {
String topicKey = getTopicKey(messageId);
messageSpansByTopic.computeIfAbsent(topicKey,
k -> new
ConcurrentSkipListMap<>()).put((MessageIdAdv) messageId, span);
@@ -204,7 +167,7 @@ public class OpenTelemetryConsumerInterceptor<T> implements
ConsumerInterceptor<
((TraceableMessageId) messageId).setTracingSpan(null);
// Remove from map if it exists (Failover/Exclusive)
- if (messageSpansByTopic != null && messageId instanceof
MessageIdAdv) {
+ if (messageId instanceof MessageIdAdv) {
String topicKey = getTopicKey(messageId);
ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans =
messageSpansByTopic.get(topicKey);
if (topicSpans != null) {
@@ -244,8 +207,7 @@ public class OpenTelemetryConsumerInterceptor<T> implements
ConsumerInterceptor<
String topicKey = getTopicKey(messageId);
// Get the topic-specific map
- ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans =
messageSpansByTopic != null
- ? messageSpansByTopic.get(topicKey) : null;
+ ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans =
messageSpansByTopic.get(topicKey);
// First, try to get the span for the cumulative ack position itself
Span currentSpan = null;
@@ -295,7 +257,7 @@ public class OpenTelemetryConsumerInterceptor<T> implements
ConsumerInterceptor<
}
// If the cumulative ack position span wasn't in the map, end it
directly
- if (currentSpan != null && messageId instanceof TraceableMessageId) {
+ if (currentSpan != null) {
try {
if (exception != null) {
TracingContext.endSpan(currentSpan, exception);
@@ -327,7 +289,7 @@ public class OpenTelemetryConsumerInterceptor<T> implements
ConsumerInterceptor<
((TraceableMessageId) messageId).setTracingSpan(null);
// Remove from map if it exists (Failover/Exclusive)
- if (messageSpansByTopic != null && messageId instanceof
MessageIdAdv) {
+ if (messageId instanceof MessageIdAdv) {
String topicKey = getTopicKey(messageId);
ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans =
messageSpansByTopic.get(topicKey);
if (topicSpans != null) {
@@ -359,7 +321,7 @@ public class OpenTelemetryConsumerInterceptor<T> implements
ConsumerInterceptor<
((TraceableMessageId) messageId).setTracingSpan(null);
// Remove from map if it exists (Failover/Exclusive)
- if (messageSpansByTopic != null && messageId instanceof
MessageIdAdv) {
+ if (messageId instanceof MessageIdAdv) {
String topicKey = getTopicKey(messageId);
ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans =
messageSpansByTopic.get(topicKey);
if (topicSpans != null) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
index aa3fea96155..e93e68e9a96 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
@@ -29,8 +29,6 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.TraceableMessage;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
-import org.apache.pulsar.client.impl.ProducerBase;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
/**
@@ -45,28 +43,13 @@ import
org.apache.pulsar.client.impl.metrics.InstrumentProvider;
@CustomLog
public class OpenTelemetryProducerInterceptor implements ProducerInterceptor {
- private Tracer tracer;
- private TextMapPropagator propagator;
+ private final Tracer tracer;
+ private final TextMapPropagator propagator;
private String topic;
- private boolean initialized = false;
- public OpenTelemetryProducerInterceptor() {
- // Tracer and propagator will be initialized in beforeSend when we
have access to the producer
- }
-
- /**
- * Initialize the tracer from the producer's client.
- * This is called lazily on the first message.
- */
- private void initializeIfNeeded(Producer producer) {
- if (!initialized && producer instanceof ProducerBase<?> producerBase) {
- PulsarClientImpl client = producerBase.getClient();
- InstrumentProvider instrumentProvider =
client.instrumentProvider();
-
- this.tracer = instrumentProvider.getTracer();
- this.propagator =
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
- this.initialized = true;
- }
+ public OpenTelemetryProducerInterceptor(InstrumentProvider
instrumentProvider) {
+ this.tracer = instrumentProvider.getTracer();
+ this.propagator =
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
}
@Override
@@ -82,9 +65,6 @@ public class OpenTelemetryProducerInterceptor implements
ProducerInterceptor {
@Override
public Message<?> beforeSend(Producer<?> producer, Message<?> message) {
- // Initialize tracer from producer on first call
- initializeIfNeeded(producer);
-
if (!eligible(message)) {
return message;
}