This is an automated email from the ASF dual-hosted git repository.

congbobo184 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ffeddae6c5 [fix][client] Fix OpenTelemetryProducerInterceptor not 
executing due to eligible check (#25585)
8ffeddae6c5 is described below

commit 8ffeddae6c5cb3bd2313d2e709450941f8bec634
Author: hrzzzz <[email protected]>
AuthorDate: Mon May 11 19:20:20 2026 +0800

    [fix][client] Fix OpenTelemetryProducerInterceptor not executing due to 
eligible check (#25585)
    
    Fixes https://github.com/apache/pulsar/issues/25584
    
    ### Motivation
    OpenTelemetryProducerInterceptor had a deadlock:
    
    1. eligible() returns tracer != null && propagator != null — both null 
after construction.
    2. ProducerInterceptors.beforeSend() checks eligible() first, and skips the 
interceptor entirely if it returns false.
    3. Tracer and propagator are initialized in beforeSend() via 
initializeIfNeeded(producer), which is never reached.
    
    The interceptor was effectively a no-op — it never created any spans.
    
    ### Modifications
    
    Remove the lazy-initialization pattern and inject InstrumentProvider at 
construction time instead. The InstrumentProvider is already available in both 
ProducerBuilderImpl and ConsumerBuilderImpl where the interceptors are  created:
    - OpenTelemetryProducerInterceptor — accepts InstrumentProvider in 
constructor, initializes tracer and propagator eagerly. Removes 
initializeIfNeeded() and the initialized flag.
    - OpenTelemetryConsumerInterceptor — same treatment for consistency.
    - ProducerBuilderImpl / ConsumerBuilderImpl — pass 
client.instrumentProvider() to the constructor.
---
 .../OpenTelemetryTracingIntegrationTest.java       | 48 +++++++--------
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  3 +-
 .../pulsar/client/impl/ProducerBuilderImpl.java    |  3 +-
 .../tracing/OpenTelemetryConsumerInterceptor.java  | 72 +++++-----------------
 .../tracing/OpenTelemetryProducerInterceptor.java  | 30 ++-------
 5 files changed, 50 insertions(+), 106 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
index fcc0bd1776e..1007f774788 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
@@ -128,33 +128,30 @@ public class OpenTelemetryTracingIntegrationTest extends 
BrokerTestBase {
         // Force flush tracer provider
         flushSpans();
 
-        // Verify spans - at least one span should be created
+        // Verify spans - expected 2 spans to be created
         List<SpanData> spans = spanExporter.getFinishedSpanItems();
-        assertTrue(spans.size() > 0, "Expected at least one span, got: " + 
spans.size());
+        assertEquals(spans.size(), 2, "Expected 2 spans, got: " + 
spans.size());
 
-        // Verify producer span if present
-        spans.stream()
+        // Verify producer span
+        SpanData producerSpan = spans.stream()
                 .filter(s -> s.getKind() == SpanKind.PRODUCER)
                 .findFirst()
-                .ifPresent(producerSpan -> {
-                    assertEquals(producerSpan.getName(), "send " + topic);
-                    assertEquals(producerSpan.getAttributes().get(
-                            
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), 
"pulsar");
-                });
+                .orElseThrow();
+        assertEquals(producerSpan.getName(), "send " + topic);
+        assertEquals(producerSpan.getAttributes().get(
+                
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), 
"pulsar");
 
-        // Verify consumer span if present
-        spans.stream()
+        // Verify consumer span
+        SpanData consumerSpan = spans.stream()
                 .filter(s -> s.getKind() == SpanKind.CONSUMER)
                 .findFirst()
-                .ifPresent(consumerSpan -> {
-                    assertEquals(consumerSpan.getName(), "process " + topic);
-                    assertEquals(consumerSpan.getAttributes().get(
-                            
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), 
"pulsar");
-                    assertEquals(consumerSpan.getAttributes().get(
-                            io.opentelemetry.api.common.AttributeKey.stringKey(
-                                    "messaging.pulsar.acknowledgment.type")),
-                            "acknowledge");
-                });
+                .orElseThrow();
+        assertEquals(consumerSpan.getName(), "process " + topic);
+        assertEquals(consumerSpan.getAttributes().get(
+                
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), 
"pulsar");
+        assertEquals(consumerSpan.getAttributes().get(
+                
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.pulsar.acknowledgment.type")),
+                "acknowledge");
     }
 
     @Test
@@ -720,14 +717,16 @@ public class OpenTelemetryTracingIntegrationTest extends 
BrokerTestBase {
                 .subscriptionName("test-sub")
                 .subscribe();
 
+        int numMessages = 5;
+
         // Send batch of messages
-        for (int i = 0; i < 5; i++) {
+        for (int i = 0; i < numMessages; i++) {
             producer.sendAsync("message-" + i);
         }
         producer.flush();
 
         // Receive and acknowledge all messages
-        for (int i = 0; i < 5; i++) {
+        for (int i = 0; i < numMessages; i++) {
             Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
             assertNotNull(msg);
             consumer.acknowledge(msg);
@@ -741,9 +740,10 @@ public class OpenTelemetryTracingIntegrationTest extends 
BrokerTestBase {
         flushSpans();
 
         // Verify spans for batched messages
-        // Note: Tracing behavior may vary for batched messages depending on 
when spans are created
         List<SpanData> spans = spanExporter.getFinishedSpanItems();
-        assertTrue(spans.size() > 0, "Expected at least some spans for batched 
messages");
+        int expectedNumSpans = numMessages * 2;
+        assertEquals(spans.size(), expectedNumSpans,
+                "Expected " + expectedNumSpans + " spans for batched messages, 
got " + spans.size());
 
         // Verify that spans have correct attributes
         spans.stream()
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index a4b9a52a7fb..296e854c1d5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -233,7 +233,8 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
                     effectiveInterceptors = new 
java.util.ArrayList<>(effectiveInterceptors);
                 }
                 effectiveInterceptors.add(
-                        new 
org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor<>());
+                        new 
org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor<>(
+                                client.instrumentProvider()));
             }
 
             if (effectiveInterceptors == null || effectiveInterceptors.size() 
== 0) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 9b9c2c07e49..9242cfd6a08 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -116,7 +116,8 @@ public class ProducerBuilderImpl<T> implements 
ProducerBuilder<T> {
             } else {
                 effectiveInterceptors = new ArrayList<>(effectiveInterceptors);
             }
-            effectiveInterceptors.add(new 
org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor());
+            effectiveInterceptors.add(new 
org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor(
+                    client.instrumentProvider()));
         }
 
         return effectiveInterceptors == null || effectiveInterceptors.size() 
== 0
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
index 145ef5fa8c9..79d13cd2249 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
@@ -33,11 +33,8 @@ import org.apache.pulsar.client.api.ConsumerInterceptor;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.api.TraceableMessageId;
-import org.apache.pulsar.client.impl.ConsumerBase;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 
 /**
@@ -64,11 +61,10 @@ import 
org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 @CustomLog
 public class OpenTelemetryConsumerInterceptor<T> implements 
ConsumerInterceptor<T> {
 
-    private Tracer tracer;
-    private TextMapPropagator propagator;
+    private final Tracer tracer;
+    private final TextMapPropagator propagator;
     private String topic;
     private String subscription;
-    private boolean initialized = false;
 
     /**
      * Used for cumulative acknowledgment support (Failover/Exclusive 
subscriptions).
@@ -80,10 +76,12 @@ public class OpenTelemetryConsumerInterceptor<T> implements 
ConsumerInterceptor<
      * instance handles messages from multiple topic partitions. Cumulative 
ack only affects
      * messages from the same topic partition.
      */
-    private volatile Map<String, ConcurrentSkipListMap<MessageIdAdv, Span>> 
messageSpansByTopic;
+    private final Map<String, ConcurrentSkipListMap<MessageIdAdv, Span>> 
messageSpansByTopic;
 
-    public OpenTelemetryConsumerInterceptor() {
-        // Tracer and propagator will be initialized in beforeConsume when we 
have access to the consumer
+    public OpenTelemetryConsumerInterceptor(InstrumentProvider 
instrumentProvider) {
+        this.tracer = instrumentProvider.getTracer();
+        this.propagator = 
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
+        this.messageSpansByTopic = new ConcurrentHashMap<>();
     }
 
     /**
@@ -97,52 +95,17 @@ public class OpenTelemetryConsumerInterceptor<T> implements 
ConsumerInterceptor<
         return topic != null ? topic : "";
     }
 
-    /**
-     * Initialize the tracer from the consumer's client.
-     * This is called lazily on the first message.
-     */
-    private void initializeIfNeeded(Consumer<T> consumer) {
-        if (!initialized && consumer instanceof ConsumerBase<?> consumerBase) {
-            PulsarClientImpl client = consumerBase.getClient();
-            InstrumentProvider instrumentProvider = 
client.instrumentProvider();
-
-            this.tracer = instrumentProvider.getTracer();
-            this.propagator = 
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
-            this.initialized = true;
-            if (consumerBase.getConf().getSubscriptionType() == 
SubscriptionType.Exclusive
-                    || consumerBase.getConf().getSubscriptionType() == 
SubscriptionType.Failover) {
-                ensureMapInitialized();
-            }
-        }
-    }
-
-    /**
-     * Ensure the map is initialized for cumulative acknowledgment support.
-     * This is called when we detect cumulative ack is being used.
-     */
-    private void ensureMapInitialized() {
-        if (messageSpansByTopic == null) {
-            messageSpansByTopic = new ConcurrentHashMap<>();
-            log.debug("Initialized message spans map for cumulative 
acknowledgment support");
-        }
-    }
-
     @Override
     public void close() {
         // Clean up any remaining spans for Failover/Exclusive subscriptions
-        if (messageSpansByTopic != null) {
-            messageSpansByTopic.values().forEach(topicSpans ->
-                topicSpans.values().forEach(TracingContext::endSpan)
-            );
-            messageSpansByTopic.clear();
-        }
+        messageSpansByTopic.values().forEach(topicSpans ->
+            topicSpans.values().forEach(TracingContext::endSpan)
+        );
+        messageSpansByTopic.clear();
     }
 
     @Override
     public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
-        // Initialize tracer from consumer on first call
-        initializeIfNeeded(consumer);
-
         if (tracer == null || propagator == null) {
             return message;
         }
@@ -162,7 +125,7 @@ public class OpenTelemetryConsumerInterceptor<T> implements 
ConsumerInterceptor<
                 MessageId messageId = message.getMessageId();
 
                 // Store in map for cumulative ack support (Failover/Exclusive)
-                if (messageSpansByTopic != null && messageId instanceof 
MessageIdAdv) {
+                if (messageId instanceof MessageIdAdv) {
                     String topicKey = getTopicKey(messageId);
                     messageSpansByTopic.computeIfAbsent(topicKey,
                             k -> new 
ConcurrentSkipListMap<>()).put((MessageIdAdv) messageId, span);
@@ -204,7 +167,7 @@ public class OpenTelemetryConsumerInterceptor<T> implements 
ConsumerInterceptor<
                 ((TraceableMessageId) messageId).setTracingSpan(null);
 
                 // Remove from map if it exists (Failover/Exclusive)
-                if (messageSpansByTopic != null && messageId instanceof 
MessageIdAdv) {
+                if (messageId instanceof MessageIdAdv) {
                     String topicKey = getTopicKey(messageId);
                     ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = 
messageSpansByTopic.get(topicKey);
                     if (topicSpans != null) {
@@ -244,8 +207,7 @@ public class OpenTelemetryConsumerInterceptor<T> implements 
ConsumerInterceptor<
         String topicKey = getTopicKey(messageId);
 
         // Get the topic-specific map
-        ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = 
messageSpansByTopic != null
-                ? messageSpansByTopic.get(topicKey) : null;
+        ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = 
messageSpansByTopic.get(topicKey);
 
         // First, try to get the span for the cumulative ack position itself
         Span currentSpan = null;
@@ -295,7 +257,7 @@ public class OpenTelemetryConsumerInterceptor<T> implements 
ConsumerInterceptor<
         }
 
         // If the cumulative ack position span wasn't in the map, end it 
directly
-        if (currentSpan != null && messageId instanceof TraceableMessageId) {
+        if (currentSpan != null) {
             try {
                 if (exception != null) {
                     TracingContext.endSpan(currentSpan, exception);
@@ -327,7 +289,7 @@ public class OpenTelemetryConsumerInterceptor<T> implements 
ConsumerInterceptor<
                     ((TraceableMessageId) messageId).setTracingSpan(null);
 
                     // Remove from map if it exists (Failover/Exclusive)
-                    if (messageSpansByTopic != null && messageId instanceof 
MessageIdAdv) {
+                    if (messageId instanceof MessageIdAdv) {
                         String topicKey = getTopicKey(messageId);
                         ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = 
messageSpansByTopic.get(topicKey);
                         if (topicSpans != null) {
@@ -359,7 +321,7 @@ public class OpenTelemetryConsumerInterceptor<T> implements 
ConsumerInterceptor<
                     ((TraceableMessageId) messageId).setTracingSpan(null);
 
                     // Remove from map if it exists (Failover/Exclusive)
-                    if (messageSpansByTopic != null && messageId instanceof 
MessageIdAdv) {
+                    if (messageId instanceof MessageIdAdv) {
                         String topicKey = getTopicKey(messageId);
                         ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = 
messageSpansByTopic.get(topicKey);
                         if (topicSpans != null) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
index aa3fea96155..e93e68e9a96 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
@@ -29,8 +29,6 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.TraceableMessage;
 import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
-import org.apache.pulsar.client.impl.ProducerBase;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 
 /**
@@ -45,28 +43,13 @@ import 
org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 @CustomLog
 public class OpenTelemetryProducerInterceptor implements ProducerInterceptor {
 
-    private Tracer tracer;
-    private TextMapPropagator propagator;
+    private final Tracer tracer;
+    private final TextMapPropagator propagator;
     private String topic;
-    private boolean initialized = false;
 
-    public OpenTelemetryProducerInterceptor() {
-        // Tracer and propagator will be initialized in beforeSend when we 
have access to the producer
-    }
-
-    /**
-     * Initialize the tracer from the producer's client.
-     * This is called lazily on the first message.
-     */
-    private void initializeIfNeeded(Producer producer) {
-        if (!initialized && producer instanceof ProducerBase<?> producerBase) {
-            PulsarClientImpl client = producerBase.getClient();
-            InstrumentProvider instrumentProvider = 
client.instrumentProvider();
-
-            this.tracer = instrumentProvider.getTracer();
-            this.propagator = 
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
-            this.initialized = true;
-        }
+    public OpenTelemetryProducerInterceptor(InstrumentProvider 
instrumentProvider) {
+        this.tracer = instrumentProvider.getTracer();
+        this.propagator = 
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
     }
 
     @Override
@@ -82,9 +65,6 @@ public class OpenTelemetryProducerInterceptor implements 
ProducerInterceptor {
 
     @Override
     public Message<?> beforeSend(Producer<?> producer, Message<?> message) {
-        // Initialize tracer from producer on first call
-        initializeIfNeeded(producer);
-
         if (!eligible(message)) {
             return message;
         }

Reply via email to