This is an automated email from the ASF dual-hosted git repository.

congbobo184 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9948461ec82 [fix][client] Inject trace context into message properties 
for producer-consumer span correlation (#25749)
9948461ec82 is described below

commit 9948461ec82ae65ad121c031a0532ee3ad144e86
Author: hrzzzz <[email protected]>
AuthorDate: Thu May 14 17:11:02 2026 +0800

    [fix][client] Inject trace context into message properties for 
producer-consumer span correlation (#25749)
    
    ### Motivation
    
    The OpenTelemetry producer interceptor creates a send span but does not 
inject the trace context into message properties. On the consumer side, 
extractContext finds nothing in the properties and falls back to
    Context.current(), so consumer spans always start a new trace instead of 
being children of the producer span. Producer and consumer spans are impossible 
to correlate.
    
    ### Modifications
    
    - Added TracingContext.injectContext(Message, Context, TextMapPropagator) 
overload that writes trace context directly to the message's underlying 
MessageMetadata protobuf, handling both MessageImpl and TopicMessageImpl 
wrappers.
    - Called it from OpenTelemetryProducerInterceptor.beforeSend() after 
creating the producer span.
---
 .../OpenTelemetryTracingIntegrationTest.java       | 61 ++++++++++++++++++++++
 .../tracing/OpenTelemetryProducerInterceptor.java  |  3 ++
 .../pulsar/client/impl/tracing/TracingContext.java | 41 +++++++++------
 3 files changed, 90 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
index 1007f774788..835fb5b4ecf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
@@ -21,11 +21,14 @@ package org.apache.pulsar.broker.service;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import io.opentelemetry.api.GlobalOpenTelemetry;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.SpanKind;
 import io.opentelemetry.api.trace.StatusCode;
 import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
 import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.ContextPropagators;
 import io.opentelemetry.sdk.OpenTelemetrySdk;
 import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
 import io.opentelemetry.sdk.trace.SdkTracerProvider;
@@ -70,7 +73,10 @@ public class OpenTelemetryTracingIntegrationTest extends 
BrokerTestBase {
 
         openTelemetry = OpenTelemetrySdk.builder()
                 .setTracerProvider(tracerProvider)
+                
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
                 .build();
+        GlobalOpenTelemetry.resetForTest();
+        GlobalOpenTelemetry.set(openTelemetry);
 
         baseSetup();
     }
@@ -82,6 +88,7 @@ public class OpenTelemetryTracingIntegrationTest extends 
BrokerTestBase {
         if (openTelemetry != null) {
             openTelemetry.close();
         }
+        GlobalOpenTelemetry.resetForTest();
     }
 
     private void flushSpans() throws Exception {
@@ -154,6 +161,60 @@ public class OpenTelemetryTracingIntegrationTest extends 
BrokerTestBase {
                 "acknowledge");
     }
 
+    @Test
+    public void testContextPropagationViaMessageProperties() throws Exception {
+        String topic = "persistent://prop/ns-abc/test-context-propagation";
+        spanExporter.reset();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test-sub")
+                .subscribe();
+
+        // Send and receive message
+        producer.send("test-message");
+
+        Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        consumer.acknowledge(msg);
+
+        producer.close();
+        consumer.close();
+        client.close();
+
+        flushSpans();
+
+        // Verify spans exist
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        assertEquals(spans.size(), 2, "Expected 2 spans, got: " + 
spans.size());
+
+        SpanData producerSpan = spans.stream()
+                .filter(s -> s.getKind() == SpanKind.PRODUCER)
+                .findFirst()
+                .orElseThrow(() -> new AssertionError("Producer span not 
found"));
+        SpanData consumerSpan = spans.stream()
+                .filter(s -> s.getKind() == SpanKind.CONSUMER)
+                .findFirst()
+                .orElseThrow(() -> new AssertionError("Consumer span not 
found"));
+
+        // Verify trace context was propagated through message properties:
+        // consumer span must share the same traceId and be a child of the 
producer span
+        assertEquals(producerSpan.getTraceId(), consumerSpan.getTraceId(),
+                "Producer and consumer spans should share the same traceId");
+        assertEquals(consumerSpan.getParentSpanId(), producerSpan.getSpanId(),
+                "Consumer span should be a child of the producer span");
+    }
+
     @Test
     public void testNegativeAcknowledgment() throws Exception {
         String topic = "persistent://prop/ns-abc/test-negative-ack";
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
index e93e68e9a96..3b092dd808b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
@@ -83,6 +83,9 @@ public class OpenTelemetryProducerInterceptor implements 
ProducerInterceptor {
             if (TracingContext.isValid(span) && message instanceof 
TraceableMessage) {
                 // Attach the span directly to the message
                 ((TraceableMessage) message).setTracingSpan(span);
+                // Inject trace context into message properties so the consumer
+                // can extract it and correlate its span with this producer 
span
+                TracingContext.injectContext(message, 
Context.current().with(span), propagator);
                 log.debug().attr("topic", topic).log("Created producer span");
             }
         } catch (Exception e) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java
index 514d02a64e7..9c5ab4745b0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java
@@ -27,10 +27,11 @@ import io.opentelemetry.context.Context;
 import io.opentelemetry.context.propagation.TextMapGetter;
 import io.opentelemetry.context.propagation.TextMapPropagator;
 import io.opentelemetry.context.propagation.TextMapSetter;
-import java.util.HashMap;
 import java.util.Map;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.jspecify.annotations.Nullable;
 
 /**
@@ -51,9 +52,9 @@ public class TracingContext {
         }
     };
 
-    private static final TextMapSetter<Map<String, String>> SETTER = (carrier, 
key, value) -> {
-        if (carrier != null) {
-            carrier.put(key, value);
+    private static final TextMapSetter<MessageMetadata> SETTER = (metadata, 
key, value) -> {
+        if (metadata != null) {
+            metadata.addProperty().setKey(key).setValue(value);
         }
     };
 
@@ -72,24 +73,34 @@ public class TracingContext {
     }
 
     /**
-     * Inject trace context into message properties.
+     * Inject trace context into a message's properties by directly writing
+     * to the underlying {@link MessageMetadata}. This is used by the producer
+     * interceptor at {@code beforeSend} time.
      *
-     * @param messageBuilder the message builder to inject context into
+     * @param message the message to inject context into
      * @param context the context to inject
      * @param propagator the text map propagator to use
      */
-    public static <T> void injectContext(TypedMessageBuilder<T> 
messageBuilder, Context context,
-                                          TextMapPropagator propagator) {
-        if (messageBuilder == null || context == null || propagator == null) {
+    public static void injectContext(Message<?> message, Context context, 
TextMapPropagator propagator) {
+        if (message == null || context == null || propagator == null) {
             return;
         }
+        MessageMetadata metadata = getMessageMetadata(message);
+        if (metadata == null) {
+            return;
+        }
+        propagator.inject(context, metadata, SETTER);
+    }
 
-        Map<String, String> carrier = new HashMap<>();
-        propagator.inject(context, carrier, SETTER);
-
-        for (Map.Entry<String, String> entry : carrier.entrySet()) {
-            messageBuilder.property(entry.getKey(), entry.getValue());
+    @Nullable
+    private static MessageMetadata getMessageMetadata(Message<?> message) {
+        if (message instanceof MessageImpl) {
+            return ((MessageImpl<?>) message).getMessageBuilder();
+        }
+        if (message instanceof TopicMessageImpl) {
+            return getMessageMetadata(((TopicMessageImpl<?>) 
message).getMessage());
         }
+        return null;
     }
 
     /**

Reply via email to