This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a373258d043 [feat][client] PIP-446: Support Native OpenTelemetry 
Tracing in Pulsar Java Client (#24873)
a373258d043 is described below

commit a373258d0430aa04cc05c4c46b0ac5e0e38153cb
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Jan 14 22:10:41 2026 +0800

    [feat][client] PIP-446: Support Native OpenTelemetry Tracing in Pulsar Java 
Client (#24873)
    
    Co-authored-by: Penghui Li <[email protected]>
---
 .../OpenTelemetryTracingIntegrationTest.java       | 826 +++++++++++++++++++++
 .../apache/pulsar/client/api/ClientBuilder.java    |  38 +
 .../apache/pulsar/client/api/TraceableMessage.java |  55 ++
 .../pulsar/client/api/TraceableMessageId.java      |  56 ++
 pulsar-client/TRACING.md                           | 406 ++++++++++
 .../pulsar/client/impl/ClientBuilderImpl.java      |   6 +
 .../apache/pulsar/client/impl/ConsumerBase.java    |   1 +
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  16 +-
 .../apache/pulsar/client/impl/MessageIdImpl.java   |  23 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java |  22 +-
 .../pulsar/client/impl/NegativeAcksTracker.java    |  27 +-
 .../pulsar/client/impl/ProducerBuilderImpl.java    |  15 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |   2 +-
 .../pulsar/client/impl/TopicMessageIdImpl.java     |  21 +-
 .../pulsar/client/impl/TopicMessageImpl.java       |  23 +-
 .../client/impl/conf/ClientConfigurationData.java  |   7 +
 .../client/impl/metrics/InstrumentProvider.java    |  10 +
 .../tracing/OpenTelemetryConsumerInterceptor.java  | 373 ++++++++++
 .../tracing/OpenTelemetryProducerInterceptor.java  | 143 ++++
 .../pulsar/client/impl/tracing/TracingContext.java | 194 +++++
 .../pulsar/client/impl/tracing/package-info.java   | 139 ++++
 .../client/impl/ConsumerBuilderImplTest.java       |   8 +-
 .../client/impl/ProducerBuilderImplTest.java       |   2 +
 .../impl/tracing/OpenTelemetryTracingTest.java     | 212 ++++++
 .../client/impl/tracing/TracingExampleTest.java    | 179 +++++
 .../pulsar/functions/instance/ContextImplTest.java |   2 +
 26 files changed, 2793 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
new file mode 100644
index 00000000000..fcc0bd1776e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
@@ -0,0 +1,826 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for OpenTelemetry tracing with real broker.
+ * Note: These tests may be timing-dependent and could be flaky in CI 
environments.
+ * They verify end-to-end tracing functionality with actual Pulsar broker.
+ */
+@Test(groups = "broker")
+public class OpenTelemetryTracingIntegrationTest extends BrokerTestBase {
+
+    private InMemorySpanExporter spanExporter;
+    private OpenTelemetrySdk openTelemetry;
+    private SdkTracerProvider tracerProvider;
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        // Setup OpenTelemetry SDK with in-memory exporter
+        spanExporter = InMemorySpanExporter.create();
+        tracerProvider = SdkTracerProvider.builder()
+                .addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
+                .build();
+
+        openTelemetry = OpenTelemetrySdk.builder()
+                .setTracerProvider(tracerProvider)
+                .build();
+
+        baseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        if (openTelemetry != null) {
+            openTelemetry.close();
+        }
+    }
+
+    private void flushSpans() throws Exception {
+        tracerProvider.forceFlush().join(5, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testBasicProducerConsumerTracing() throws Exception {
+        String topic = "persistent://prop/ns-abc/test-basic-tracing";
+        spanExporter.reset();
+
+        // Create client with tracing enabled
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        // Create producer
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        // Create consumer
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test-sub")
+                .subscribe();
+
+        // Send and receive message
+        MessageId sentMsgId = producer.send("test-message");
+        assertNotNull(sentMsgId);
+
+        Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        assertEquals(msg.getValue(), "test-message");
+        consumer.acknowledge(msg);
+
+        // Close client to force span flush
+        producer.close();
+        consumer.close();
+        client.close();
+
+        // Force flush tracer provider
+        flushSpans();
+
+        // Verify spans - at least one span should be created
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        assertTrue(spans.size() > 0, "Expected at least one span, got: " + 
spans.size());
+
+        // Verify producer span if present
+        spans.stream()
+                .filter(s -> s.getKind() == SpanKind.PRODUCER)
+                .findFirst()
+                .ifPresent(producerSpan -> {
+                    assertEquals(producerSpan.getName(), "send " + topic);
+                    assertEquals(producerSpan.getAttributes().get(
+                            
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), 
"pulsar");
+                });
+
+        // Verify consumer span if present
+        spans.stream()
+                .filter(s -> s.getKind() == SpanKind.CONSUMER)
+                .findFirst()
+                .ifPresent(consumerSpan -> {
+                    assertEquals(consumerSpan.getName(), "process " + topic);
+                    assertEquals(consumerSpan.getAttributes().get(
+                            
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), 
"pulsar");
+                    assertEquals(consumerSpan.getAttributes().get(
+                            io.opentelemetry.api.common.AttributeKey.stringKey(
+                                    "messaging.pulsar.acknowledgment.type")),
+                            "acknowledge");
+                });
+    }
+
+    @Test
+    public void testNegativeAcknowledgment() throws Exception {
+        String topic = "persistent://prop/ns-abc/test-negative-ack";
+        spanExporter.reset();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test-sub")
+                .negativeAckRedeliveryDelay(0, TimeUnit.SECONDS)
+                .subscribe();
+
+        // Send message
+        producer.send("test-message");
+
+        // Receive and negative acknowledge
+        Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        consumer.negativeAcknowledge(msg);
+
+        Thread.sleep(3000);
+
+        // Close to ensure negative ack is processed
+        producer.close();
+        consumer.close();
+        client.close();
+
+        // Wait for spans
+        flushSpans();
+
+        // Find consumer span
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        SpanData consumerSpan = spans.stream()
+                .filter(s -> s.getKind() == SpanKind.CONSUMER)
+                .findFirst()
+                .orElseThrow(() -> new AssertionError("Consumer span not 
found. Total spans: "
+                        + spans.size() + ", kinds: " + spans.stream()
+                        .map(s -> 
s.getKind().toString()).collect(java.util.stream.Collectors.joining(", "))));
+
+        // Verify negative ack attribute
+        assertEquals(consumerSpan.getAttributes().get(
+                
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.pulsar.acknowledgment.type")),
+                "negative_acknowledge");
+    }
+
+    @Test
+    public void testCumulativeAcknowledgment() throws Exception {
+        String topic = "persistent://prop/ns-abc/test-cumulative-ack";
+        spanExporter.reset();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test-sub")
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe();
+
+        // Send multiple messages
+        for (int i = 0; i < 5; i++) {
+            producer.send("message-" + i);
+        }
+
+        // Receive all messages
+        Message<String> lastMsg = null;
+        for (int i = 0; i < 5; i++) {
+            lastMsg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(lastMsg);
+        }
+
+        // Cumulative acknowledge last message
+        consumer.acknowledgeCumulative(lastMsg);
+
+        // Wait for spans
+        flushSpans();
+
+        // Verify all consumer spans have cumulative_acknowledge attribute
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        long consumerSpansWithCumulativeAck = spans.stream()
+                .filter(s -> s.getKind() == SpanKind.CONSUMER)
+                .filter(s -> 
"cumulative_acknowledge".equals(s.getAttributes().get(
+                        io.opentelemetry.api.common.AttributeKey.stringKey(
+                                "messaging.pulsar.acknowledgment.type"))))
+                .count();
+
+        assertEquals(consumerSpansWithCumulativeAck, 5);
+
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
+    @Test
+    public void testAcknowledgmentTimeout() throws Exception {
+        String topic = "persistent://prop/ns-abc/test-ack-timeout";
+        spanExporter.reset();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test-sub")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscribe();
+
+        // Send message
+        producer.send("test-message");
+
+        // Receive but don't acknowledge
+        Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+        assertNotNull(msg);
+
+        // Note: Ack timeout behavior varies based on subscription type and 
broker implementation
+        // For Shared subscription, ack timeout triggers redelivery but span 
may already be ended
+        // This test verifies the basic tracing flow works even with ack 
timeout configured
+
+        // Acknowledge to properly end the span
+        consumer.acknowledge(msg);
+
+        // Wait for spans
+        flushSpans();
+
+        // Verify consumer span exists with acknowledge attribute
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        boolean foundConsumerSpan = spans.stream()
+                .filter(s -> s.getKind() == SpanKind.CONSUMER)
+                .anyMatch(s -> s.getAttributes().get(
+                        io.opentelemetry.api.common.AttributeKey.stringKey(
+                                "messaging.pulsar.acknowledgment.type")) != 
null);
+
+        assertTrue(foundConsumerSpan, "Expected consumer span with 
acknowledgment.type attribute");
+
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
+    @Test
+    public void testMultiTopicConsumerTracing() throws Exception {
+        String topic1 = "persistent://prop/ns-abc/test-multi-topic-1";
+        String topic2 = "persistent://prop/ns-abc/test-multi-topic-2";
+        spanExporter.reset();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        Producer<String> producer1 = client.newProducer(Schema.STRING)
+                .topic(topic1)
+                .create();
+
+        Producer<String> producer2 = client.newProducer(Schema.STRING)
+                .topic(topic2)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topics(List.of(topic1, topic2))
+                .subscriptionName("test-sub")
+                .subscribe();
+
+        // Send messages to both topics
+        producer1.send("message-topic1");
+        producer2.send("message-topic2");
+
+        // Receive and acknowledge both messages
+        Set<String> receivedTopics = new java.util.HashSet<>();
+        for (int i = 0; i < 2; i++) {
+            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            receivedTopics.add(msg.getTopicName());
+            consumer.acknowledge(msg);
+        }
+
+        assertEquals(receivedTopics.size(), 2);
+
+        // Wait for spans
+        flushSpans();
+
+        // Verify spans for both topics
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        long consumerSpans = spans.stream()
+                .filter(s -> s.getKind() == SpanKind.CONSUMER)
+                .count();
+
+        assertEquals(consumerSpans, 2);
+
+        producer1.close();
+        producer2.close();
+        consumer.close();
+        client.close();
+    }
+
+    @Test
+    public void testTracingWithoutGlobalEnable() throws Exception {
+        String topic = "persistent://prop/ns-abc/test-no-global-tracing";
+        spanExporter.reset();
+
+        // Create client with OpenTelemetry but tracing NOT enabled
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(false)  // Explicitly disabled
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test-sub")
+                .subscribe();
+
+        // Send and receive message
+        producer.send("test-message");
+        Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        consumer.acknowledge(msg);
+
+        // Wait for potential spans
+        flushSpans();
+
+        // Verify NO spans were created
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        assertEquals(spans.size(), 0, "Expected no spans when tracing is 
disabled");
+
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
+    @Test
+    public void testSharedSubscriptionTracing() throws Exception {
+        String topic = "persistent://prop/ns-abc/test-shared-subscription";
+        spanExporter.reset();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test-shared-sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        // Send messages
+        for (int i = 0; i < 3; i++) {
+            producer.send("message-" + i);
+        }
+
+        // Receive and acknowledge individually
+        for (int i = 0; i < 3; i++) {
+            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+
+        flushSpans();
+
+        // Verify individual acks for Shared subscription
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        long consumerSpansWithIndividualAck = spans.stream()
+                .filter(s -> s.getKind() == SpanKind.CONSUMER)
+                .filter(s -> "acknowledge".equals(s.getAttributes().get(
+                        io.opentelemetry.api.common.AttributeKey.stringKey(
+                                "messaging.pulsar.acknowledgment.type"))))
+                .count();
+
+        assertEquals(consumerSpansWithIndividualAck, 3);
+    }
+
+    @Test
+    public void testKeySharedSubscriptionTracing() throws Exception {
+        String topic = "persistent://prop/ns-abc/test-key-shared-subscription";
+        spanExporter.reset();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test-key-shared-sub")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        // Send messages with keys
+        for (int i = 0; i < 3; i++) {
+            producer.newMessage()
+                    .key("key-" + (i % 2))
+                    .value("message-" + i)
+                    .send();
+        }
+
+        // Receive and acknowledge
+        for (int i = 0; i < 3; i++) {
+            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+
+        flushSpans();
+
+        // Verify spans for Key_Shared subscription
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        long consumerSpans = spans.stream()
+                .filter(s -> s.getKind() == SpanKind.CONSUMER)
+                .count();
+
+        assertEquals(consumerSpans, 3);
+    }
+
+    @Test
+    public void testExclusiveSubscriptionTracing() throws Exception {
+        String topic = "persistent://prop/ns-abc/test-exclusive-subscription";
+        spanExporter.reset();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test-exclusive-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscribe();
+
+        // Send messages
+        for (int i = 0; i < 3; i++) {
+            producer.send("message-" + i);
+        }
+
+        // Receive all messages
+        Message<String> lastMsg = null;
+        for (int i = 0; i < 3; i++) {
+            lastMsg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(lastMsg);
+        }
+
+        // Cumulative acknowledge last message
+        consumer.acknowledgeCumulative(lastMsg);
+
+        producer.close();
+        consumer.close();
+        client.close();
+
+        flushSpans();
+
+        // Verify cumulative ack for Exclusive subscription
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        long consumerSpansWithCumulativeAck = spans.stream()
+                .filter(s -> s.getKind() == SpanKind.CONSUMER)
+                .filter(s -> 
"cumulative_acknowledge".equals(s.getAttributes().get(
+                        io.opentelemetry.api.common.AttributeKey.stringKey(
+                                "messaging.pulsar.acknowledgment.type"))))
+                .count();
+
+        assertEquals(consumerSpansWithCumulativeAck, 3);
+    }
+
+    @Test
+    public void testFailoverSubscriptionWithCumulativeAck() throws Exception {
+        String topic = "persistent://prop/ns-abc/test-failover-cumulative";
+        spanExporter.reset();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test-failover-sub")
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe();
+
+        // Send messages
+        for (int i = 0; i < 5; i++) {
+            producer.send("message-" + i);
+        }
+
+        // Receive all messages
+        Message<String> lastMsg = null;
+        for (int i = 0; i < 5; i++) {
+            lastMsg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(lastMsg);
+        }
+
+        // Cumulative acknowledge last message
+        consumer.acknowledgeCumulative(lastMsg);
+
+        producer.close();
+        consumer.close();
+        client.close();
+
+        flushSpans();
+
+        // Verify all spans ended with cumulative ack
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        long consumerSpansWithCumulativeAck = spans.stream()
+                .filter(s -> s.getKind() == SpanKind.CONSUMER)
+                .filter(s -> 
"cumulative_acknowledge".equals(s.getAttributes().get(
+                        io.opentelemetry.api.common.AttributeKey.stringKey(
+                                "messaging.pulsar.acknowledgment.type"))))
+                .count();
+
+        assertEquals(consumerSpansWithCumulativeAck, 5);
+    }
+
+    @Test
+    public void testMultiTopicConsumerWithCumulativeAck() throws Exception {
+        String topic1 = "persistent://prop/ns-abc/test-multi-cumulative-1";
+        String topic2 = "persistent://prop/ns-abc/test-multi-cumulative-2";
+        spanExporter.reset();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        Producer<String> producer1 = client.newProducer(Schema.STRING)
+                .topic(topic1)
+                .create();
+
+        Producer<String> producer2 = client.newProducer(Schema.STRING)
+                .topic(topic2)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topics(List.of(topic1, topic2))
+                .subscriptionName("test-multi-cumulative-sub")
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe();
+
+        // Send messages to both topics
+        producer1.send("topic1-msg1");
+        producer1.send("topic1-msg2");
+        producer2.send("topic2-msg1");
+        producer2.send("topic2-msg2");
+
+        // Receive messages from both topics
+        Message<String> topic1LastMsg = null;
+        Message<String> topic2LastMsg = null;
+        for (int i = 0; i < 4; i++) {
+            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            if (msg.getTopicName().contains("multi-cumulative-1")) {
+                topic1LastMsg = msg;
+            } else {
+                topic2LastMsg = msg;
+            }
+        }
+
+        // Cumulative acknowledge for each topic separately
+        if (topic1LastMsg != null) {
+            consumer.acknowledgeCumulative(topic1LastMsg);
+        }
+        if (topic2LastMsg != null) {
+            consumer.acknowledgeCumulative(topic2LastMsg);
+        }
+
+        producer1.close();
+        producer2.close();
+        consumer.close();
+        client.close();
+
+        flushSpans();
+
+        // Verify cumulative ack only affects spans from the same topic
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        long consumerSpansWithCumulativeAck = spans.stream()
+                .filter(s -> s.getKind() == SpanKind.CONSUMER)
+                .filter(s -> 
"cumulative_acknowledge".equals(s.getAttributes().get(
+                        io.opentelemetry.api.common.AttributeKey.stringKey(
+                                "messaging.pulsar.acknowledgment.type"))))
+                .count();
+
+        // Should have cumulative ack for messages from both topics
+        assertEquals(consumerSpansWithCumulativeAck, 4);
+    }
+
+    @Test
+    public void testBatchMessagesTracing() throws Exception {
+        String topic = "persistent://prop/ns-abc/test-batch-tracing";
+        spanExporter.reset();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxMessages(5)
+                .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test-sub")
+                .subscribe();
+
+        // Send batch of messages
+        for (int i = 0; i < 5; i++) {
+            producer.sendAsync("message-" + i);
+        }
+        producer.flush();
+
+        // Receive and acknowledge all messages
+        for (int i = 0; i < 5; i++) {
+            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+
+        // Wait for spans
+        flushSpans();
+
+        // Verify spans for batched messages
+        // Note: Tracing behavior may vary for batched messages depending on 
when spans are created
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        assertTrue(spans.size() > 0, "Expected at least some spans for batched 
messages");
+
+        // Verify that spans have correct attributes
+        spans.stream()
+                .filter(s -> s.getKind() == SpanKind.PRODUCER || s.getKind() 
== SpanKind.CONSUMER)
+                .forEach(span -> {
+                    assertNotNull(span.getAttributes().get(
+                            
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")));
+                    assertEquals(span.getAttributes().get(
+                            
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), 
"pulsar");
+                });
+    }
+
+    @Test
+    public void testCustomSpan() throws Exception {
+        String topic = "persistent://prop/ns-abc/testCustomSpan";
+        spanExporter.reset();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(openTelemetry)
+                .enableTracing(true)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxMessages(5)
+                .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test-sub")
+                .subscribe();
+
+        // Send batch of messages
+        for (int i = 0; i < 5; i++) {
+            producer.sendAsync("message-" + i);
+        }
+        producer.flush();
+
+        InstrumentProvider instrumentProvider = ((PulsarClientImpl) 
client).instrumentProvider();
+        final Tracer tracer = instrumentProvider.getTracer();
+        String customSpanName = "business-logic";
+        // Receive and acknowledge all messages
+        for (int i = 0; i < 5; i++) {
+            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            Span processingSpan = tracer.spanBuilder(customSpanName)
+                    .setSpanKind(SpanKind.CLIENT).startSpan();
+            try (Scope scope = processingSpan.makeCurrent()) {
+                processingSpan.setStatus(StatusCode.OK);
+                consumer.acknowledge(msg);
+            } catch (Exception e) {
+                processingSpan.recordException(e);
+                processingSpan.setStatus(StatusCode.ERROR);
+                consumer.negativeAcknowledge(msg);
+                throw e;
+            } finally {
+                processingSpan.end();
+            }
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+
+        // Wait for spans
+        flushSpans();
+
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        assertTrue(spans.size() > 0, "Expected at least some spans for batched 
messages");
+
+        spans.stream()
+                .filter(s -> s.getName().equals(customSpanName))
+                .forEach(span -> {
+                    assertEquals(span.getKind(), SpanKind.CLIENT);
+                });
+    }
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index d31d42bbe63..7ac063d227b 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -615,6 +615,44 @@ public interface ClientBuilder extends Serializable, 
Cloneable {
      */
     ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry 
openTelemetry);
 
+    /**
+     * Enable OpenTelemetry distributed tracing.
+     *
+     * <p>When enabled, interceptors are automatically added to all producers 
and consumers
+     * to create spans for message publishing and consumption, and 
automatically propagate trace context
+     * via message properties.
+     *
+     * <p>This method is useful when OpenTelemetry is configured globally 
(e.g., via Java Agent or
+     * {@link io.opentelemetry.api.GlobalOpenTelemetry}) and you just want to 
enable tracing interceptors
+     * without explicitly setting an OpenTelemetry instance.
+     *
+     * <p>Example with Java Agent:
+     * <pre>{@code
+     * // When using -javaagent:opentelemetry-javaagent.jar
+     * PulsarClient client = PulsarClient.builder()
+     *     .serviceUrl("pulsar://localhost:6650")
+     *     .enableTracing(true)  // Use GlobalOpenTelemetry
+     *     .build();
+     * }</pre>
+     *
+     * <p>Example with GlobalOpenTelemetry:
+     * <pre>{@code
+     * // Configure GlobalOpenTelemetry elsewhere in your application
+     * GlobalOpenTelemetry.set(myOpenTelemetry);
+     *
+     * // Just enable tracing in the client
+     * PulsarClient client = PulsarClient.builder()
+     *     .serviceUrl("pulsar://localhost:6650")
+     *     .enableTracing(true)
+     *     .build();
+     * }</pre>
+     *
+     * @param tracingEnabled whether to enable tracing (default: false)
+     * @return the client builder instance
+     * @since 4.2.0
+     */
+    ClientBuilder enableTracing(boolean tracingEnabled);
+
     /**
      * The clock used by the pulsar client.
      *
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessage.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessage.java
new file mode 100644
index 00000000000..5e4c61778ab
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import io.opentelemetry.api.trace.Span;
+
+/**
+ * Extension of {@link Message} interface that supports OpenTelemetry tracing.
+ * <p>
+ * This interface allows attaching OpenTelemetry spans directly to messages,
+ * eliminating the need for external tracking via maps.
+ * <p>
+ * The span lifecycle:
+ * <ul>
+ *   <li>Producer: Span is created before send and attached to the message.
+ *       When the send is acknowledged, the span is retrieved and 
completed.</li>
+ *   <li>Consumer: Span is created when message is received and attached to 
the message.
+ *       When the message is acknowledged, the span is retrieved and 
completed.</li>
+ * </ul>
+ */
+public interface TraceableMessage {
+
+    /**
+     * Set the OpenTelemetry span associated with this message.
+     * <p>
+     * This method is called by tracing interceptors to attach a span to the 
message
+     * for later retrieval when completing the span.
+     *
+     * @param span the span to associate with this message, or null to clear
+     */
+    void setTracingSpan(Span span);
+
+    /**
+     * Get the OpenTelemetry span associated with this message.
+     *
+     * @return the span associated with this message, or null if no span is set
+     */
+    Span getTracingSpan();
+}
\ No newline at end of file
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessageId.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessageId.java
new file mode 100644
index 00000000000..d8470184ccc
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessageId.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import io.opentelemetry.api.trace.Span;
+
+/**
+ * Extension interface that allows {@link MessageId} implementations to 
support OpenTelemetry tracing.
+ * <p>
+ * This interface enables attaching OpenTelemetry spans directly to message 
IDs,
+ * allowing span retrieval in acknowledge callbacks which only receive 
MessageId,
+ * not the full Message object.
+ * <p>
+ * This is particularly useful for consumer-side tracing where:
+ * <ul>
+ *   <li>A span is created when a message is received (in beforeConsume)</li>
+ *   <li>The span is attached to the message's MessageId</li>
+ *   <li>When the message is acknowledged, the span can be retrieved from the 
MessageId
+ *       and completed, even though the acknowledge callback only provides 
MessageId</li>
+ * </ul>
+ */
+public interface TraceableMessageId {
+
+    /**
+     * Set the OpenTelemetry span associated with this message ID.
+     * <p>
+     * This method is called by tracing interceptors to attach a span to the 
message ID
+     * for later retrieval in acknowledge callbacks.
+     *
+     * @param span the span to associate with this message ID, or null to clear
+     */
+    void setTracingSpan(Span span);
+
+    /**
+     * Get the OpenTelemetry span associated with this message ID.
+     *
+     * @return the span associated with this message ID, or null if no span is 
set
+     */
+    Span getTracingSpan();
+}
diff --git a/pulsar-client/TRACING.md b/pulsar-client/TRACING.md
new file mode 100644
index 00000000000..4dc8d26809b
--- /dev/null
+++ b/pulsar-client/TRACING.md
@@ -0,0 +1,406 @@
+# OpenTelemetry Tracing for Pulsar Java Client
+
+This document describes how to use OpenTelemetry distributed tracing with the 
Pulsar Java client.
+
+## Overview
+
+The Pulsar Java client provides built-in support for OpenTelemetry distributed 
tracing. This allows you to:
+
+- Trace message publishing from producer to broker
+- Trace message consumption from broker to consumer
+- Propagate trace context across services via message properties
+- Extract trace context from external sources (e.g., HTTP requests)
+- Create end-to-end traces across your distributed system
+
+## Features
+
+### Producer Tracing
+
+Producer tracing creates spans for:
+- **send** - Span starts when `send()` or `sendAsync()` is called and 
completes when broker acknowledges receipt
+
+### Consumer Tracing
+
+Consumer tracing creates spans for:
+- **process** - Span starts when message is received and completes when 
message is acknowledged, negatively acknowledged, or ack timeout occurs
+
+### Trace Context Propagation
+
+Trace context is automatically propagated using W3C TraceContext format:
+- `traceparent` - Contains trace ID, span ID, and trace flags
+- `tracestate` - Contains vendor-specific trace information
+
+Context is injected into and extracted from message properties, enabling 
seamless trace propagation across services.
+
+## Quick Start
+
+### 1. Add Dependencies
+
+The Pulsar client already includes OpenTelemetry API dependencies. You'll need 
to add the SDK and exporters:
+
+```xml
+<dependency>
+    <groupId>io.opentelemetry</groupId>
+    <artifactId>opentelemetry-sdk</artifactId>
+    <version>${opentelemetry.version}</version>
+</dependency>
+<dependency>
+    <groupId>io.opentelemetry</groupId>
+    <artifactId>opentelemetry-exporter-otlp</artifactId>
+    <version>${opentelemetry.version}</version>
+</dependency>
+```
+
+### 2. Enable Tracing
+
+There are three ways to enable tracing:
+
+#### Option 1: Using OpenTelemetry Java Agent (Easiest)
+
+```bash
+# Start your application with the Java Agent
+java -javaagent:opentelemetry-javaagent.jar \
+     -Dotel.service.name=my-service \
+     -Dotel.exporter.otlp.endpoint=http://localhost:4317 \
+     -jar your-application.jar
+```
+
+```java
+// Just enable tracing - uses GlobalOpenTelemetry from the agent
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://localhost:6650")
+    .enableTracing(true)  // That's it!
+    .build();
+```
+
+#### Option 2: With Explicit OpenTelemetry Instance
+
+```java
+OpenTelemetry openTelemetry = // configure your OpenTelemetry instance
+
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://localhost:6650")
+    .openTelemetry(openTelemetry, true)  // Set OpenTelemetry AND enable 
tracing
+    .build();
+```
+
+#### Option 3: Using GlobalOpenTelemetry
+
+```java
+// Configure GlobalOpenTelemetry once in your application
+GlobalOpenTelemetry.set(myOpenTelemetry);
+
+// Enable tracing in the client - uses GlobalOpenTelemetry
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://localhost:6650")
+    .enableTracing(true)
+    .build();
+```
+
+**What happens when tracing is enabled:**
+- **Create spans** for producer send operations
+- **Inject trace context** into message properties automatically
+- **Create spans** for consumer receive/ack operations
+- **Extract trace context** from message properties automatically
+- Link all spans to create end-to-end distributed traces
+
+### 3. Manual Interceptor Configuration (Advanced)
+
+If you prefer manual control, you can add interceptors explicitly:
+
+```java
+import org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor;
+import org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor;
+
+// Create client (tracing not enabled globally)
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://localhost:6650")
+    .openTelemetry(openTelemetry)
+    .build();
+
+// Add interceptor manually to specific producer
+Producer<String> producer = client.newProducer(Schema.STRING)
+    .topic("my-topic")
+    .intercept(new OpenTelemetryProducerInterceptor())
+    .create();
+
+// Add interceptor manually to specific consumer
+Consumer<String> consumer = client.newConsumer(Schema.STRING)
+    .topic("my-topic")
+    .subscriptionName("my-subscription")
+    .intercept(new OpenTelemetryConsumerInterceptor<>())
+    .subscribe();
+```
+
+## Advanced Usage
+
+### End-to-End Tracing Example
+
+This example shows how to create a complete trace from an HTTP request through 
Pulsar to a consumer:
+
+```java
+// Service 1: HTTP API that publishes to Pulsar
+@POST
+@Path("/order")
+public Response createOrder(@Context HttpHeaders headers, Order order) {
+    // Extract trace context from incoming HTTP request
+    Context context = TracingProducerBuilder.extractFromHeaders(
+        convertHeaders(headers));
+
+    // Publish to Pulsar with trace context
+    TracingProducerBuilder tracingBuilder = new TracingProducerBuilder();
+    producer.newMessage()
+        .value(order)
+        .let(builder -> tracingBuilder.injectContext(builder, context))
+        .send();
+
+    return Response.accepted().build();
+}
+
+// Service 2: Pulsar consumer that processes orders
+Consumer<Order> consumer = client.newConsumer(Schema.JSON(Order.class))
+    .topic("orders")
+    .subscriptionName("order-processor")
+    .intercept(new OpenTelemetryConsumerInterceptor<>())
+    .subscribe();
+
+while (true) {
+    Message<Order> msg = consumer.receive();
+    // Trace context is automatically extracted
+    // Any spans created here will be part of the same trace
+    processOrder(msg.getValue());
+    consumer.acknowledge(msg);
+}
+```
+
+### Custom Span Creation
+
+You can create custom spans during message processing:
+
+```java
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
+
+Tracer tracer = GlobalOpenTelemetry.get().getTracer("my-app");
+
+Message<String> msg = consumer.receive();
+
+// Create a custom span for processing
+Span span = tracer.spanBuilder("process-message")
+    .setSpanKind(SpanKind.INTERNAL)
+    .startSpan();
+
+try (Scope scope = span.makeCurrent()) {
+    // Your processing logic
+    processMessage(msg.getValue());
+    span.setStatus(StatusCode.OK);
+} catch (Exception e) {
+    span.recordException(e);
+    span.setStatus(StatusCode.ERROR);
+    throw e;
+} finally {
+    span.end();
+    consumer.acknowledge(msg);
+}
+```
+
+## Configuration
+
+### Compatibility with OpenTelemetry Java Agent
+
+This implementation is **fully compatible** with the [OpenTelemetry Java 
Instrumentation](https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/pulsar)
 for Pulsar:
+
+- Both use **W3C TraceContext** format (traceparent, tracestate headers)
+- Both propagate context via **message properties**
+- **No conflicts**: Our implementation checks if trace context is already 
present (from Java Agent) and avoids duplicate injection
+- You can use either approach or both together
+
+### Using OpenTelemetry Java Agent
+
+The easiest way to enable tracing is using the OpenTelemetry Java Agent 
(automatic instrumentation):
+
+```bash
+java -javaagent:path/to/opentelemetry-javaagent.jar \
+     -Dotel.service.name=my-service \
+     -Dotel.exporter.otlp.endpoint=http://localhost:4317 \
+     -jar your-application.jar
+```
+
+**Note**: When using the Java Agent, you don't need to call 
`.openTelemetry(otel, true)` as the agent automatically instruments Pulsar. 
However, calling it won't cause conflicts.
+
+### Programmatic Configuration
+
+You can also configure OpenTelemetry programmatically:
+
+```java
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
+import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
+
+OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder()
+    .setEndpoint("http://localhost:4317";)
+    .build();
+
+SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
+    .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build())
+    .build();
+
+OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder()
+    .setTracerProvider(tracerProvider)
+    .buildAndRegisterGlobal();
+```
+
+### Environment Variables
+
+Configure via environment variables:
+
+```bash
+export OTEL_SERVICE_NAME=my-service
+export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
+export OTEL_TRACES_EXPORTER=otlp
+export OTEL_METRICS_EXPORTER=otlp
+```
+
+## Span Attributes
+
+The tracing implementation adds the following attributes to spans following 
the [OpenTelemetry messaging semantic 
conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/):
+
+### Producer Spans
+- `messaging.system`: "pulsar"
+- `messaging.destination.name`: Topic name
+- `messaging.operation.name`: "send"
+- `messaging.message.id`: Message ID (added when broker confirms)
+
+**Span naming**: `send {topic}` (e.g., "send my-topic")
+
+### Consumer Spans
+- `messaging.system`: "pulsar"
+- `messaging.destination.name`: Topic name
+- `messaging.destination.subscription.name`: Subscription name
+- `messaging.operation.name`: "process"
+- `messaging.message.id`: Message ID
+- `messaging.pulsar.acknowledgment.type`: How the message was acknowledged
+  - `"acknowledge"`: Normal individual acknowledgment
+  - `"cumulative_acknowledge"`: Cumulative acknowledgment
+  - `"negative_acknowledge"`: Message negatively acknowledged (will retry)
+  - `"ack_timeout"`: Acknowledgment timeout occurred (will retry)
+
+**Span naming**: `process {topic}` (e.g., "process my-topic")
+
+## Span Lifecycle and Acknowledgment Behavior
+
+Understanding how spans are handled for different acknowledgment scenarios. 
Every consumer span includes a `messaging.pulsar.acknowledgment.type` attribute 
indicating how it was completed:
+
+### Successful Acknowledgment
+- Span ends with **OK** status
+- Attribute: `messaging.pulsar.acknowledgment.type = "acknowledge"`
+
+### Cumulative Acknowledgment
+- Span ends with **OK** status
+- Attribute: `messaging.pulsar.acknowledgment.type = "cumulative_acknowledge"`
+- All spans up to the acknowledged position are ended with this attribute
+
+### Negative Acknowledgment
+- Span ends with **OK** status (not an error)
+- Attribute: `messaging.pulsar.acknowledgment.type = "negative_acknowledge"`
+- This is normal flow, not a failure - the message will be redelivered and a 
new span will be created
+
+### Acknowledgment Timeout
+- Span ends with **OK** status (not an error)
+- Attribute: `messaging.pulsar.acknowledgment.type = "ack_timeout"`
+- This is expected behavior when `ackTimeout` is configured - the message will 
be redelivered and a new span will be created
+
+### Application Exception During Processing
+- If your application code throws an exception, create a child span and mark 
it with ERROR status
+- The consumer span itself will end normally when you call 
`negativeAcknowledge()`
+- This provides clear separation between messaging operations (OK) and 
application logic (ERROR)
+
+**Example - Separating messaging and application errors**:
+```java
+Message<String> msg = consumer.receive();
+Span processingSpan = tracer.spanBuilder("business-logic").startSpan();
+try (Scope scope = processingSpan.makeCurrent()) {
+    processMessage(msg.getValue());
+    processingSpan.setStatus(StatusCode.OK);
+    consumer.acknowledge(msg);  // Consumer span ends with 
acknowledgment.type="acknowledge"
+} catch (Exception e) {
+    processingSpan.recordException(e);
+    processingSpan.setStatus(StatusCode.ERROR);  // Business logic failed
+    consumer.negativeAcknowledge(msg);  // Consumer span ends with 
acknowledgment.type="negative_acknowledge"
+    throw e;
+} finally {
+    processingSpan.end();
+}
+```
+
+### Querying by Acknowledgment Type
+
+The `messaging.pulsar.acknowledgment.type` attribute allows you to filter and 
analyze spans:
+
+**Example queries in your tracing backend**:
+- Find all retried messages: `messaging.pulsar.acknowledgment.type = 
"negative_acknowledge" OR "ack_timeout"`
+- Calculate retry rate: `count(negative_acknowledge) / count(acknowledge)`
+- Identify timeout issues: `messaging.pulsar.acknowledgment.type = 
"ack_timeout"`
+- Analyze cumulative vs individual acks: Group by 
`messaging.pulsar.acknowledgment.type`
+
+## Best Practices
+
+1. **Always use interceptors**: Add tracing interceptors to both producers and 
consumers for complete visibility.
+
+2. **Propagate context from HTTP**: When publishing from HTTP endpoints, 
always extract and propagate the trace context.
+
+3. **Handle errors properly**: Ensure spans are ended even when exceptions 
occur.
+
+4. **Distinguish messaging vs. application errors**:
+   - Messaging operations (nack, timeout) end with OK status + events
+   - Application failures should be tracked in separate child spans with ERROR 
status
+
+5. **Use meaningful span names**: The default span names include the topic 
name for easy identification.
+
+6. **Consider performance**: Tracing adds minimal overhead, but in 
high-throughput scenarios, consider sampling.
+
+7. **Clean up resources**: Ensure interceptors and OpenTelemetry SDK are 
properly closed when shutting down.
+
+## Troubleshooting
+
+### Traces not appearing
+
+1. Verify OpenTelemetry SDK is configured and exporters are set up
+2. Check that interceptors are added to producers/consumers
+3. Verify trace exporter endpoint is reachable
+4. Enable debug logging: `-Dio.opentelemetry.javaagent.debug=true`
+
+### Missing parent-child relationships
+
+1. Ensure trace context is being injected via 
`TracingProducerBuilder.injectContext()`
+2. Verify message properties contain `traceparent` header
+3. Check that both producer and consumer have tracing interceptors
+
+### High overhead
+
+1. Consider using sampling: `-Dotel.traces.sampler=parentbased_traceidratio 
-Dotel.traces.sampler.arg=0.1`
+2. Use batch span processor (default)
+3. Adjust batch processor settings if needed
+
+## Examples
+
+See the following files for complete examples:
+- `TracingExampleTest.java` - Comprehensive usage examples
+- `OpenTelemetryTracingTest.java` - Unit tests demonstrating API usage
+
+## API Reference
+
+### Main Classes
+
+- `OpenTelemetryProducerInterceptor` - Producer interceptor for tracing
+- `OpenTelemetryConsumerInterceptor` - Consumer interceptor for tracing
+- `TracingContext` - Utility methods for span creation and context propagation
+- `TracingProducerBuilder` - Helper for injecting trace context into messages
+
+## Additional Resources
+
+- [OpenTelemetry Java 
Documentation](https://opentelemetry.io/docs/instrumentation/java/)
+- [W3C Trace Context Specification](https://www.w3.org/TR/trace-context/)
+- [Pulsar Documentation](https://pulsar.apache.org/docs/)
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 9bbd9cebca8..7c8529ceeb7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -157,6 +157,12 @@ public class ClientBuilderImpl implements ClientBuilder {
         return this;
     }
 
+    @Override
+    public ClientBuilder enableTracing(boolean tracingEnabled) {
+        conf.setTracingEnabled(tracingEnabled);
+        return this;
+    }
+
     @Override
     public ClientBuilder authentication(String authPluginClassName, String 
authParamsString)
             throws UnsupportedAuthenticationException {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 749d23651aa..1755c94b0de 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -80,6 +80,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected static final double 
MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION = 0.75;
 
     protected final String subscription;
+    @Getter
     protected final ConsumerConfigurationData<T> conf;
     protected final String consumerName;
     protected final CompletableFuture<Consumer<T>> subscribeFuture;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index dc2363c279f..11bf617e2fd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -213,10 +213,22 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
             applyDLQConfig = CompletableFuture.completedFuture(null);
         }
         return applyDLQConfig.thenCompose(__ -> {
-            if (interceptorList == null || interceptorList.size() == 0) {
+            // Automatically add tracing interceptor if tracing is enabled
+            List<ConsumerInterceptor<T>> effectiveInterceptors = 
interceptorList;
+            if (client.getConfiguration().isTracingEnabled()) {
+                if (effectiveInterceptors == null) {
+                    effectiveInterceptors = new java.util.ArrayList<>();
+                } else {
+                    effectiveInterceptors = new 
java.util.ArrayList<>(effectiveInterceptors);
+                }
+                effectiveInterceptors.add(
+                        new 
org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor<>());
+            }
+
+            if (effectiveInterceptors == null || effectiveInterceptors.size() 
== 0) {
                 return client.subscribeAsync(conf, schema, null);
             } else {
-                return client.subscribeAsync(conf, schema, new 
ConsumerInterceptors<>(interceptorList));
+                return client.subscribeAsync(conf, schema, new 
ConsumerInterceptors<>(effectiveInterceptors));
             }
         });
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 8cffba44dc5..6446a9b60da 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -25,14 +25,23 @@ import java.io.IOException;
 import java.util.Objects;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.TraceableMessageId;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.naming.TopicName;
 
-public class MessageIdImpl implements MessageIdAdv {
+public class MessageIdImpl implements MessageIdAdv, TraceableMessageId {
+    private static final long serialVersionUID = 1L;
+
     protected final long ledgerId;
     protected final long entryId;
     protected final int partitionIndex;
 
+    /**
+     * OpenTelemetry tracing span associated with this message ID.
+     * Used for distributed tracing support via the TraceableMessageId 
interface.
+     */
+    private transient io.opentelemetry.api.trace.Span tracingSpan;
+
     // Private constructor used only for json deserialization
     @SuppressWarnings("unused")
     private MessageIdImpl() {
@@ -188,4 +197,16 @@ public class MessageIdImpl implements MessageIdAdv {
         // there is no message batch so we pass -1
         return toByteArray(-1, 0);
     }
+
+    // TraceableMessageId implementation for OpenTelemetry support
+
+    @Override
+    public void setTracingSpan(io.opentelemetry.api.trace.Span span) {
+        this.tracingSpan = span;
+    }
+
+    @Override
+    public io.opentelemetry.api.trace.Span getTracingSpan() {
+        return this.tracingSpan;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 4d7b6cc4734..c964db57505 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.TraceableMessage;
 import org.apache.pulsar.client.impl.schema.AbstractSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
@@ -60,7 +61,7 @@ import org.apache.pulsar.common.schema.SchemaIdUtil;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
-public class MessageImpl<T> implements Message<T> {
+public class MessageImpl<T> implements TraceableMessage, Message<T> {
 
     protected MessageId messageId;
     private final MessageMetadata msgMetadata;
@@ -84,6 +85,13 @@ public class MessageImpl<T> implements Message<T> {
     private boolean poolMessage;
     @Getter
     private long consumerEpoch;
+
+    /**
+     * OpenTelemetry tracing span associated with this message.
+     * Used for distributed tracing support via the TraceableMessage interface.
+     */
+    private transient io.opentelemetry.api.trace.Span tracingSpan;
+
     // Constructor for out-going message
     public static <T> MessageImpl<T> create(MessageMetadata msgMetadata, 
ByteBuffer payload, Schema<T> schema,
             String topic) {
@@ -844,6 +852,18 @@ public class MessageImpl<T> implements Message<T> {
         return payload;
     }
 
+    // TraceableMessage implementation for OpenTelemetry support
+
+    @Override
+    public void setTracingSpan(io.opentelemetry.api.trace.Span span) {
+        this.tracingSpan = span;
+    }
+
+    @Override
+    public io.opentelemetry.api.trace.Span getTracingSpan() {
+        return this.tracingSpan;
+    }
+
     enum SchemaState {
         None, Ready, Broken
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 273880569c3..e0ec16f507e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -22,6 +22,7 @@ import static 
org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMess
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
+import io.opentelemetry.api.trace.Span;
 import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
@@ -35,6 +36,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.RedeliveryBackoff;
+import org.apache.pulsar.client.api.TraceableMessageId;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.roaringbitmap.longlong.Roaring64Bitmap;
 import org.slf4j.Logger;
@@ -47,6 +49,7 @@ class NegativeAcksTracker implements Closeable {
     // different timestamp, there will be multiple entries in the map
     // RB Tree -> LongOpenHashMap -> Roaring64Bitmap
     private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> 
nackedMessages = null;
+    private final Long2ObjectMap<Long2ObjectMap<MessageId>> nackedMessageIds = 
new Long2ObjectOpenHashMap<>();
 
     private final ConsumerBase<?> consumer;
     private final Timer timer;
@@ -89,7 +92,17 @@ class NegativeAcksTracker implements Closeable {
                     long ledgerId = ledgerEntry.getLongKey();
                     Roaring64Bitmap entrySet = ledgerEntry.getValue();
                     entrySet.forEach(entryId -> {
-                        MessageId msgId = new MessageIdImpl(ledgerId, entryId, 
DUMMY_PARTITION_INDEX);
+                        MessageId msgId = null;
+                        Long2ObjectMap<MessageId> entryMap = 
nackedMessageIds.get(ledgerId);
+                        if (entryMap != null) {
+                            msgId = entryMap.remove(entryId);
+                            if (entryMap.isEmpty()) {
+                                nackedMessageIds.remove(ledgerId);
+                            }
+                        }
+                        if (msgId == null) {
+                            msgId = new MessageIdImpl(ledgerId, entryId, 
DUMMY_PARTITION_INDEX);
+                        }
                         addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, 
messagesToRedeliver, this.consumer);
                         messagesToRedeliver.add(msgId);
                     });
@@ -143,6 +156,15 @@ class NegativeAcksTracker implements Closeable {
     }
 
     private synchronized void add(MessageId messageId, int redeliveryCount) {
+        if (messageId instanceof TraceableMessageId) {
+            Span span = ((TraceableMessageId) messageId).getTracingSpan();
+            if (span != null) {
+                MessageIdAdv msgId = (MessageIdAdv) messageId;
+                nackedMessageIds.computeIfAbsent(msgId.getLedgerId(), k -> new 
Long2ObjectOpenHashMap<>())
+                        .put(msgId.getEntryId(), messageId);
+            }
+        }
+
         if (nackedMessages == null) {
             nackedMessages = new Long2ObjectAVLTreeMap<>();
         }
@@ -201,5 +223,8 @@ class NegativeAcksTracker implements Closeable {
             nackedMessages.clear();
             nackedMessages = null;
         }
+        if (nackedMessageIds != null) {
+            nackedMessageIds.clear();
+        }
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 7c33cba9645..e176cc41bc6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -106,9 +106,20 @@ public class ProducerBuilderImpl<T> implements 
ProducerBuilder<T> {
             return FutureUtil.failedFuture(pce);
         }
 
-        return interceptorList == null || interceptorList.size() == 0
+        // Automatically add tracing interceptor if tracing is enabled
+        List<ProducerInterceptor> effectiveInterceptors = interceptorList;
+        if (client.getConfiguration().isTracingEnabled()) {
+            if (effectiveInterceptors == null) {
+                effectiveInterceptors = new ArrayList<>();
+            } else {
+                effectiveInterceptors = new ArrayList<>(effectiveInterceptors);
+            }
+            effectiveInterceptors.add(new 
org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor());
+        }
+
+        return effectiveInterceptors == null || effectiveInterceptors.size() 
== 0
                 ? client.createProducerAsync(conf, schema, null)
-                : client.createProducerAsync(conf, schema, new 
ProducerInterceptors(interceptorList));
+                : client.createProducerAsync(conf, schema, new 
ProducerInterceptors(effectiveInterceptors));
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 3a2ff97f51e..605757c230c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1403,7 +1403,7 @@ public class PulsarClientImpl implements PulsarClient {
         return scheduledExecutorProvider;
     }
 
-    InstrumentProvider instrumentProvider() {
+    public InstrumentProvider instrumentProvider() {
         return instrumentProvider;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index 3dc9b23e93e..872fe283fb9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -22,8 +22,9 @@ import java.util.BitSet;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.TopicMessageId;
+import org.apache.pulsar.client.api.TraceableMessageId;
 
-public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId {
+public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId, 
TraceableMessageId {
 
     private final String ownerTopic;
     private final MessageIdAdv msgId;
@@ -129,4 +130,22 @@ public class TopicMessageIdImpl implements MessageIdAdv, 
TopicMessageId {
     public String toString() {
         return msgId.toString();
     }
+
+    // TraceableMessageId implementation for OpenTelemetry support
+    // Delegates to the wrapped MessageIdAdv if it implements 
TraceableMessageId
+
+    @Override
+    public void setTracingSpan(io.opentelemetry.api.trace.Span span) {
+        if (msgId instanceof TraceableMessageId) {
+            ((TraceableMessageId) msgId).setTracingSpan(span);
+        }
+    }
+
+    @Override
+    public io.opentelemetry.api.trace.Span getTracingSpan() {
+        if (msgId instanceof TraceableMessageId) {
+            return ((TraceableMessageId) msgId).getTracingSpan();
+        }
+        return null;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 7b9916b58fc..19533b21601 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -18,15 +18,17 @@
  */
 package org.apache.pulsar.client.impl;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TraceableMessage;
 import org.apache.pulsar.common.api.EncryptionContext;
 
-public class TopicMessageImpl<T> implements Message<T> {
+public class TopicMessageImpl<T> implements TraceableMessage, Message<T> {
 
     /** This topicPartitionName is get from ConsumerImpl, it contains 
partition part. */
     private final String topicPartitionName;
@@ -65,6 +67,7 @@ public class TopicMessageImpl<T> implements Message<T> {
     }
 
     @Override
+    @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "messageId is 
immutable")
     public MessageId getMessageId() {
         return messageId;
     }
@@ -226,4 +229,22 @@ public class TopicMessageImpl<T> implements Message<T> {
         return msg.getIndex();
     }
 
+    // TraceableMessage implementation for OpenTelemetry support
+    // Delegates to the wrapped message if it implements TraceableMessage
+
+    @Override
+    public void setTracingSpan(io.opentelemetry.api.trace.Span span) {
+        if (msg instanceof TraceableMessage) {
+            ((TraceableMessage) msg).setTracingSpan(span);
+        }
+    }
+
+    @Override
+    public io.opentelemetry.api.trace.Span getTracingSpan() {
+        if (msg instanceof TraceableMessage) {
+            return ((TraceableMessage) msg).getTracingSpan();
+        }
+        return null;
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index e406581e707..df6e01a73f5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -437,6 +437,13 @@ public class ClientConfigurationData implements 
Serializable, Cloneable {
 
     private transient OpenTelemetry openTelemetry;
 
+    @ApiModelProperty(
+            name = "tracingEnabled",
+            value = "Whether to enable OpenTelemetry distributed tracing. When 
enabled, "
+                    + "tracing interceptors are automatically added to 
producers and consumers."
+    )
+    private boolean tracingEnabled = false;
+
     /**
      * Gets the authentication settings for the client.
      *
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
index a0bdd8b6fb6..d73baef3a0c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
@@ -24,6 +24,7 @@ import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import io.opentelemetry.api.trace.Tracer;
 import java.util.function.Consumer;
 import org.apache.pulsar.PulsarVersion;
 
@@ -32,6 +33,7 @@ public class InstrumentProvider {
     public static final InstrumentProvider NOOP = new 
InstrumentProvider(OpenTelemetry.noop());
 
     private final Meter meter;
+    private final Tracer tracer;
 
     public InstrumentProvider(OpenTelemetry otel) {
         if (otel == null) {
@@ -43,6 +45,10 @@ public class InstrumentProvider {
                 .meterBuilder("org.apache.pulsar.client")
                 .setInstrumentationVersion(PulsarVersion.getVersion())
                 .build();
+        this.tracer = otel.getTracerProvider()
+                .tracerBuilder("org.apache.pulsar.client")
+                .setInstrumentationVersion(PulsarVersion.getVersion())
+                .build();
     }
 
     public Counter newCounter(String name, Unit unit, String description, 
String topic, Attributes attributes) {
@@ -63,4 +69,8 @@ public class InstrumentProvider {
                                                             
Consumer<ObservableLongMeasurement> callback) {
         return new ObservableUpDownCounter(meter, name, unit, description, 
topic, attributes, callback);
     }
+
+    public Tracer getTracer() {
+        return tracer;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
new file mode 100644
index 00000000000..eece10b76ff
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.tracing;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerInterceptor;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
+import org.apache.pulsar.client.api.TraceableMessageId;
+import org.apache.pulsar.client.impl.ConsumerBase;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OpenTelemetry consumer interceptor that creates spans for message 
consumption.
+ * <p>
+ * This interceptor automatically retrieves the Tracer from the client's 
InstrumentProvider,
+ * ensuring consistent OpenTelemetry configuration across the client.
+ * <p>
+ * <b>Span Storage Strategy:</b>
+ * <ul>
+ *   <li><b>Shared/Key_Shared subscriptions:</b> Spans are attached directly 
to {@link TraceableMessageId}
+ *       instances with zero map overhead.</li>
+ *   <li><b>Failover/Exclusive subscriptions:</b> A nested map is initialized 
eagerly to track message IDs
+ *       and their spans in sorted order. This is necessary because cumulative 
ack must end spans
+ *       for all messages up to the acked position.</li>
+ * </ul>
+ * <p>
+ * <b>Multi-Topic Consumer Support:</b><br>
+ * For {@link org.apache.pulsar.client.impl.MultiTopicsConsumerImpl} and 
pattern-based consumers, cumulative
+ * acknowledgment only affects messages from the same topic partition. The 
interceptor uses a nested
+ * map structure (topic partition → message IDs) and {@link 
TopicMessageId#getOwnerTopic()} to ensure
+ * spans are only ended for messages from the acknowledged topic partition.
+ */
+public class OpenTelemetryConsumerInterceptor<T> implements 
ConsumerInterceptor<T> {
+
+    private static final Logger log = 
LoggerFactory.getLogger(OpenTelemetryConsumerInterceptor.class);
+
+    private Tracer tracer;
+    private TextMapPropagator propagator;
+    private String topic;
+    private String subscription;
+    private boolean initialized = false;
+
+    /**
+     * Used for cumulative acknowledgment support (Failover/Exclusive 
subscriptions).
+     * Outer map: topic partition -> (message ID -> span)
+     * Inner ConcurrentSkipListMap maintains sorted order for efficient range 
operations.
+     * Initialized eagerly for Failover/Exclusive subscriptions.
+     * <p>
+     * The nested structure is necessary for multi-topic consumers where a 
single interceptor
+     * instance handles messages from multiple topic partitions. Cumulative 
ack only affects
+     * messages from the same topic partition.
+     */
+    private volatile Map<String, ConcurrentSkipListMap<MessageIdAdv, Span>> 
messageSpansByTopic;
+
+    public OpenTelemetryConsumerInterceptor() {
+        // Tracer and propagator will be initialized in beforeConsume when we 
have access to the consumer
+    }
+
+    /**
+     * Get the topic key for a message ID.
+     * For TopicMessageId, returns the owner topic. Otherwise returns the 
consumer's topic.
+     */
+    private String getTopicKey(MessageId messageId) {
+        if (messageId instanceof TopicMessageId) {
+            return ((TopicMessageId) messageId).getOwnerTopic();
+        }
+        return topic != null ? topic : "";
+    }
+
+    /**
+     * Initialize the tracer from the consumer's client.
+     * This is called lazily on the first message.
+     */
+    private void initializeIfNeeded(Consumer<T> consumer) {
+        if (!initialized && consumer instanceof ConsumerBase<?> consumerBase) {
+            PulsarClientImpl client = consumerBase.getClient();
+            InstrumentProvider instrumentProvider = 
client.instrumentProvider();
+
+            this.tracer = instrumentProvider.getTracer();
+            this.propagator = 
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
+            this.initialized = true;
+            if (consumerBase.getConf().getSubscriptionType() == 
SubscriptionType.Exclusive
+                    || consumerBase.getConf().getSubscriptionType() == 
SubscriptionType.Failover) {
+                ensureMapInitialized();
+            }
+        }
+    }
+
+    /**
+     * Ensure the map is initialized for cumulative acknowledgment support.
+     * This is called when we detect cumulative ack is being used.
+     */
+    private void ensureMapInitialized() {
+        if (messageSpansByTopic == null) {
+            messageSpansByTopic = new ConcurrentHashMap<>();
+            log.debug("Initialized message spans map for cumulative 
acknowledgment support");
+        }
+    }
+
+    @Override
+    public void close() {
+        // Clean up any remaining spans for Failover/Exclusive subscriptions
+        if (messageSpansByTopic != null) {
+            messageSpansByTopic.values().forEach(topicSpans ->
+                topicSpans.values().forEach(TracingContext::endSpan)
+            );
+            messageSpansByTopic.clear();
+        }
+    }
+
+    @Override
+    public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
+        // Initialize tracer from consumer on first call
+        initializeIfNeeded(consumer);
+
+        if (tracer == null || propagator == null) {
+            return message;
+        }
+
+        try {
+            if (topic == null) {
+                topic = consumer.getTopic();
+            }
+            if (subscription == null) {
+                subscription = consumer.getSubscription();
+            }
+
+            // Create a consumer span for this message
+            Span span = TracingContext.createConsumerSpan(tracer, topic, 
subscription, message, propagator);
+
+            if (TracingContext.isValid(span)) {
+                MessageId messageId = message.getMessageId();
+
+                // Store in map for cumulative ack support (Failover/Exclusive)
+                if (messageSpansByTopic != null && messageId instanceof 
MessageIdAdv) {
+                    String topicKey = getTopicKey(messageId);
+                    messageSpansByTopic.computeIfAbsent(topicKey,
+                            k -> new 
ConcurrentSkipListMap<>()).put((MessageIdAdv) messageId, span);
+                }
+
+                // Always attach span to message ID for individual ack/nack
+                if (messageId instanceof TraceableMessageId) {
+                    ((TraceableMessageId) messageId).setTracingSpan(span);
+                }
+
+                log.debug("Created consumer span for message {} on topic {}", 
messageId, topic);
+            }
+        } catch (Exception e) {
+            log.error("Error creating consumer span", e);
+        }
+
+        return message;
+    }
+
+    @Override
+    public void onAcknowledge(Consumer<T> consumer, MessageId messageId, 
Throwable exception) {
+        if (!(messageId instanceof TraceableMessageId)) {
+            return;
+        }
+
+        Span span = ((TraceableMessageId) messageId).getTracingSpan();
+        if (span != null) {
+            try {
+                if (exception != null) {
+                    TracingContext.endSpan(span, exception);
+                } else {
+                    // Add attribute to indicate acknowledgment type
+                    span.setAttribute("messaging.pulsar.acknowledgment.type", 
"acknowledge");
+                    TracingContext.endSpan(span);
+                }
+                // Clear the span from the message ID
+                ((TraceableMessageId) messageId).setTracingSpan(null);
+
+                // Remove from map if it exists (Failover/Exclusive)
+                if (messageSpansByTopic != null && messageId instanceof 
MessageIdAdv) {
+                    String topicKey = getTopicKey(messageId);
+                    ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = 
messageSpansByTopic.get(topicKey);
+                    if (topicSpans != null) {
+                        topicSpans.remove((MessageIdAdv) messageId);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Error ending consumer span on acknowledge", e);
+            }
+        }
+    }
+
+    @Override
+    public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId 
messageId, Throwable exception) {
+        if (!(messageId instanceof MessageIdAdv cumulativeAckPos)) {
+            // Fallback to simple ack for non-adv message IDs
+            if (messageId instanceof TraceableMessageId) {
+                Span span = ((TraceableMessageId) messageId).getTracingSpan();
+                if (span != null) {
+                    try {
+                        if (exception != null) {
+                            TracingContext.endSpan(span, exception);
+                        } else {
+                            // Add attribute to indicate acknowledgment type
+                            
span.setAttribute("messaging.pulsar.acknowledgment.type", 
"cumulative_acknowledge");
+                            TracingContext.endSpan(span);
+                        }
+                        ((TraceableMessageId) messageId).setTracingSpan(null);
+                    } catch (Exception e) {
+                        log.error("Error ending consumer span on cumulative 
acknowledge", e);
+                    }
+                }
+            }
+            return;
+        }
+
+        String topicKey = getTopicKey(messageId);
+
+        // Get the topic-specific map
+        ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = 
messageSpansByTopic != null
+                ? messageSpansByTopic.get(topicKey) : null;
+
+        // First, try to get the span for the cumulative ack position itself
+        Span currentSpan = null;
+        if (messageId instanceof TraceableMessageId) {
+            currentSpan = ((TraceableMessageId) messageId).getTracingSpan();
+        }
+
+        // End spans for all messages in the topic-specific map up to the 
cumulative ack position
+        if (topicSpans != null) {
+            Iterator<Map.Entry<MessageIdAdv, Span>> iterator = 
topicSpans.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<MessageIdAdv, Span> entry = iterator.next();
+                MessageIdAdv msgId = entry.getKey();
+
+                // End spans for all messages <= cumulative ack position
+                if (msgId.compareTo(cumulativeAckPos) <= 0) {
+                    Span span = entry.getValue();
+                    try {
+                        if (exception != null) {
+                            TracingContext.endSpan(span, exception);
+                        } else {
+                            // Add attribute to indicate acknowledgment type
+                            
span.setAttribute("messaging.pulsar.acknowledgment.type", 
"cumulative_acknowledge");
+                            TracingContext.endSpan(span);
+                        }
+
+                        // Clear the span from the message ID
+                        if (msgId instanceof TraceableMessageId) {
+                            ((TraceableMessageId) msgId).setTracingSpan(null);
+                        }
+                    } catch (Exception e) {
+                        log.error("Error ending consumer span on cumulative 
acknowledge for message {}", msgId, e);
+                    }
+                    iterator.remove();
+                } else {
+                    // Since the map is sorted, we can break early
+                    break;
+                }
+            }
+
+            // Clean up empty topic map
+            if (topicSpans.isEmpty()) {
+                messageSpansByTopic.remove(topicKey);
+            }
+        }
+
+        // If the cumulative ack position span wasn't in the map, end it 
directly
+        if (currentSpan != null && messageId instanceof TraceableMessageId) {
+            try {
+                if (exception != null) {
+                    TracingContext.endSpan(currentSpan, exception);
+                } else {
+                    TracingContext.endSpan(currentSpan);
+                }
+                ((TraceableMessageId) messageId).setTracingSpan(null);
+            } catch (Exception e) {
+                log.error("Error ending consumer span on cumulative 
acknowledge", e);
+            }
+        }
+    }
+
+    @Override
+    public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> 
messageIds) {
+        for (MessageId messageId : messageIds) {
+            if (!(messageId instanceof TraceableMessageId)) {
+                continue;
+            }
+
+            Span span = ((TraceableMessageId) messageId).getTracingSpan();
+            if (span != null) {
+                try {
+                    // Add attribute to indicate negative acknowledgment (not 
an error, but normal flow)
+                    span.setAttribute("messaging.pulsar.acknowledgment.type", 
"negative_acknowledge");
+                    // End span normally - negative ack is expected behavior, 
not an error
+                    TracingContext.endSpan(span);
+                    // Clear the span from the message ID
+                    ((TraceableMessageId) messageId).setTracingSpan(null);
+
+                    // Remove from map if it exists (Failover/Exclusive)
+                    if (messageSpansByTopic != null && messageId instanceof 
MessageIdAdv) {
+                        String topicKey = getTopicKey(messageId);
+                        ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = 
messageSpansByTopic.get(topicKey);
+                        if (topicSpans != null) {
+                            topicSpans.remove((MessageIdAdv) messageId);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("Error ending consumer span on negative 
acknowledge", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> 
messageIds) {
+        for (MessageId messageId : messageIds) {
+            if (!(messageId instanceof TraceableMessageId)) {
+                continue;
+            }
+
+            Span span = ((TraceableMessageId) messageId).getTracingSpan();
+            if (span != null) {
+                try {
+                    // Add attribute to indicate ack timeout (not an error, 
but expected behavior)
+                    span.setAttribute("messaging.pulsar.acknowledgment.type", 
"ack_timeout");
+                    // End span normally - ack timeout is expected behavior, 
not an error
+                    TracingContext.endSpan(span);
+                    // Clear the span from the message ID
+                    ((TraceableMessageId) messageId).setTracingSpan(null);
+
+                    // Remove from map if it exists (Failover/Exclusive)
+                    if (messageSpansByTopic != null && messageId instanceof 
MessageIdAdv) {
+                        String topicKey = getTopicKey(messageId);
+                        ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = 
messageSpansByTopic.get(topicKey);
+                        if (topicSpans != null) {
+                            topicSpans.remove((MessageIdAdv) messageId);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("Error ending consumer span on ack timeout", e);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
new file mode 100644
index 00000000000..1dc36566e6e
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.tracing;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TraceableMessage;
+import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
+import org.apache.pulsar.client.impl.ProducerBase;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OpenTelemetry producer interceptor that creates spans for message 
publishing.
+ * <p>
+ * This interceptor automatically retrieves the Tracer from the client's 
InstrumentProvider,
+ * ensuring consistent OpenTelemetry configuration across the client.
+ * <p>
+ * Spans are attached directly to {@link TraceableMessage} instances, 
eliminating the need
+ * for external span tracking via maps.
+ */
+public class OpenTelemetryProducerInterceptor implements ProducerInterceptor {
+
+    private static final Logger log = 
LoggerFactory.getLogger(OpenTelemetryProducerInterceptor.class);
+
+    private Tracer tracer;
+    private TextMapPropagator propagator;
+    private String topic;
+    private boolean initialized = false;
+
+    public OpenTelemetryProducerInterceptor() {
+        // Tracer and propagator will be initialized in beforeSend when we 
have access to the producer
+    }
+
+    /**
+     * Initialize the tracer from the producer's client.
+     * This is called lazily on the first message.
+     */
+    private void initializeIfNeeded(Producer producer) {
+        if (!initialized && producer instanceof ProducerBase<?> producerBase) {
+            PulsarClientImpl client = producerBase.getClient();
+            InstrumentProvider instrumentProvider = 
client.instrumentProvider();
+
+            this.tracer = instrumentProvider.getTracer();
+            this.propagator = 
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
+            this.initialized = true;
+        }
+    }
+
+    @Override
+    public void close() {
+        // Producer will fail pending messages when it being closed,
+        // which will trigger the `onSendAcknowledgement` events
+    }
+
+    @Override
+    public boolean eligible(Message message) {
+        return tracer != null && propagator != null;
+    }
+
+    @Override
+    public Message beforeSend(Producer producer, Message message) {
+        // Initialize tracer from producer on first call
+        initializeIfNeeded(producer);
+
+        if (!eligible(message)) {
+            return message;
+        }
+
+        try {
+            if (topic == null) {
+                topic = producer.getTopic();
+            }
+
+            // Create a span for this message publication
+            // The span will be linked to the current context, which may have 
been set by:
+            // 1. An active span in the current thread (e.g., from HTTP 
request handling)
+            // 2. Context propagated from upstream services
+            Span span = TracingContext.createProducerSpan(tracer, topic, 
Context.current());
+
+            if (TracingContext.isValid(span) && message instanceof 
TraceableMessage) {
+                // Attach the span directly to the message
+                ((TraceableMessage) message).setTracingSpan(span);
+                log.debug("Created producer span for message on topic {}", 
topic);
+            }
+        } catch (Exception e) {
+            log.error("Error creating producer span", e);
+        }
+
+        return message;
+    }
+
+    @Override
+    public void onSendAcknowledgement(Producer producer, Message message, 
MessageId msgId, Throwable exception) {
+        if (!(message instanceof TraceableMessage)) {
+            return;
+        }
+
+        Span span = ((TraceableMessage) message).getTracingSpan();
+        if (span != null) {
+            try {
+                if (msgId != null) {
+                    span.setAttribute("messaging.message.id", 
msgId.toString());
+                }
+
+                if (exception != null) {
+                    TracingContext.endSpan(span, exception);
+                } else {
+                    TracingContext.endSpan(span);
+                }
+
+                // Clear the span from the message
+                ((TraceableMessage) message).setTracingSpan(null);
+            } catch (Exception e) {
+                log.error("Error ending producer span", e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java
new file mode 100644
index 00000000000..514d02a64e7
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.tracing;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.propagation.TextMapGetter;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.opentelemetry.context.propagation.TextMapSetter;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.jspecify.annotations.Nullable;
+
+/**
+ * Utility class for managing OpenTelemetry tracing context in Pulsar messages.
+ */
+public class TracingContext {
+
+    private static final TextMapGetter<Map<String, String>> GETTER = new 
TextMapGetter<Map<String, String>>() {
+        @Override
+        public Iterable<String> keys(Map<String, String> carrier) {
+            return carrier.keySet();
+        }
+
+        @Nullable
+        @Override
+        public String get(@Nullable Map<String, String> carrier, String key) {
+            return carrier != null ? carrier.get(key) : null;
+        }
+    };
+
+    private static final TextMapSetter<Map<String, String>> SETTER = (carrier, 
key, value) -> {
+        if (carrier != null) {
+            carrier.put(key, value);
+        }
+    };
+
+    /**
+     * Extract trace context from message properties.
+     *
+     * @param message the message to extract context from
+     * @param propagator the text map propagator to use
+     * @return the extracted context
+     */
+    public static Context extractContext(Message<?> message, TextMapPropagator 
propagator) {
+        if (message == null || propagator == null) {
+            return Context.current();
+        }
+        return propagator.extract(Context.current(), message.getProperties(), 
GETTER);
+    }
+
+    /**
+     * Inject trace context into message properties.
+     *
+     * @param messageBuilder the message builder to inject context into
+     * @param context the context to inject
+     * @param propagator the text map propagator to use
+     */
+    public static <T> void injectContext(TypedMessageBuilder<T> 
messageBuilder, Context context,
+                                          TextMapPropagator propagator) {
+        if (messageBuilder == null || context == null || propagator == null) {
+            return;
+        }
+
+        Map<String, String> carrier = new HashMap<>();
+        propagator.inject(context, carrier, SETTER);
+
+        for (Map.Entry<String, String> entry : carrier.entrySet()) {
+            messageBuilder.property(entry.getKey(), entry.getValue());
+        }
+    }
+
+    /**
+     * Create a producer span for message publishing.
+     *
+     * @param tracer the tracer to use
+     * @param topic the topic name
+     * @param parentContext the parent context (may be null)
+     * @return the created span
+     */
+    public static Span createProducerSpan(Tracer tracer, String topic, 
@Nullable Context parentContext) {
+        if (tracer == null) {
+            return Span.getInvalid();
+        }
+
+        Context context = parentContext != null ? parentContext : 
Context.current();
+        return tracer.spanBuilder("send " + topic)
+                .setParent(context)
+                .setSpanKind(SpanKind.PRODUCER)
+                .setAttribute("messaging.system", "pulsar")
+                .setAttribute("messaging.destination.name", topic)
+                .setAttribute("messaging.operation.name", "send")
+                .startSpan();
+    }
+
+    /**
+     * Create a consumer span for message consumption.
+     *
+     * @param tracer the tracer to use
+     * @param topic the topic name
+     * @param subscription the subscription name
+     * @param message the message being consumed
+     * @param propagator the text map propagator to use for context extraction
+     * @return the created span
+     */
+    public static Span createConsumerSpan(Tracer tracer, String topic, String 
subscription, Message<?> message,
+                                           TextMapPropagator propagator) {
+        if (tracer == null) {
+            return Span.getInvalid();
+        }
+
+        Context parentContext = extractContext(message, propagator);
+
+        return tracer.spanBuilder("process " + topic)
+                .setParent(parentContext)
+                .setSpanKind(SpanKind.CONSUMER)
+                .setAttribute("messaging.system", "pulsar")
+                .setAttribute("messaging.destination.name", topic)
+                .setAttribute("messaging.destination.subscription.name", 
subscription)
+                .setAttribute("messaging.operation.name", "process")
+                .setAttribute("messaging.message.id", 
message.getMessageId().toString())
+                .startSpan();
+    }
+
+    /**
+     * Mark a span as successful and end it.
+     *
+     * @param span the span to end
+     */
+    public static void endSpan(Span span) {
+        if (span != null) {
+            span.setStatus(StatusCode.OK);
+            span.end();
+        }
+    }
+
+    /**
+     * Mark a span as failed with an exception and end it.
+     *
+     * @param span the span to end
+     * @param throwable the exception that caused the failure
+     */
+    public static void endSpan(Span span, Throwable throwable) {
+        if (span != null) {
+            span.setStatus(StatusCode.ERROR, throwable.getMessage());
+            if (span.isRecording()) {
+                span.recordException(throwable);
+            }
+            span.end();
+        }
+    }
+
+    /**
+     * Check if a span has a valid context.
+     *
+     * @param span the span to check
+     * @return true if the span has a valid context
+     */
+    public static boolean isValid(Span span) {
+        return span != null && span.getSpanContext() != null && 
span.getSpanContext().isValid();
+    }
+
+    /**
+     * Get the span context from a span.
+     *
+     * @param span the span
+     * @return the span context
+     */
+    public static SpanContext getSpanContext(Span span) {
+        return span != null ? span.getSpanContext() : SpanContext.getInvalid();
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/package-info.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/package-info.java
new file mode 100644
index 00000000000..73d650339cb
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/package-info.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * OpenTelemetry tracing support for Pulsar Java client.
+ *
+ * <h2>Overview</h2>
+ * This package provides OpenTelemetry distributed tracing capabilities for 
Pulsar producers and consumers.
+ * It automatically creates spans for message publishing, consumption, and 
acknowledgment operations,
+ * and propagates trace context across services using message properties.
+ *
+ * <h2>Producer Tracing</h2>
+ * Producer tracing tracks:
+ * <ul>
+ *   <li><b>publish</b> - Span created when send is called</li>
+ *   <li><b>published</b> - Span completed when broker confirms receipt</li>
+ * </ul>
+ *
+ * <h3>Basic Producer Example</h3>
+ * <pre>{@code
+ * // Configure OpenTelemetry (or use auto-instrumentation)
+ * OpenTelemetry openTelemetry = ...;
+ *
+ * // Create producer with tracing interceptor
+ * Producer<String> producer = client.newProducer(Schema.STRING)
+ *     .topic("my-topic")
+ *     .intercept(new OpenTelemetryProducerInterceptor())
+ *     .create();
+ *
+ * // Send message - trace context is automatically propagated
+ * producer.newMessage()
+ *     .value("Hello World")
+ *     .send();
+ * }</pre>
+ *
+ * <p>
+ * Trace context is automatically injected into message properties from the 
current thread's context.
+ * This means if your code is running within a traced HTTP request or any 
other active span,
+ * the trace will automatically continue through Pulsar messages.
+ *
+ * <h2>Consumer Tracing</h2>
+ * Consumer tracing tracks:
+ * <ul>
+ *   <li><b>consume</b> - Span created when message is received</li>
+ *   <li><b>ack</b> - Span completed when message is acknowledged</li>
+ *   <li><b>nack</b> - Span completed when message is negatively 
acknowledged</li>
+ * </ul>
+ *
+ * <h3>Basic Consumer Example</h3>
+ * <pre>{@code
+ * // Create consumer with tracing interceptor
+ * Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ *     .topic("my-topic")
+ *     .subscriptionName("my-subscription")
+ *     .intercept(new OpenTelemetryConsumerInterceptor<>())
+ *     .subscribe();
+ *
+ * // Receive and process messages - trace context is automatically extracted
+ * while (true) {
+ *     Message<String> msg = consumer.receive();
+ *     try {
+ *         // Process message
+ *         System.out.println("Received: " + msg.getValue());
+ *         consumer.acknowledge(msg);
+ *     } catch (Exception e) {
+ *         consumer.negativeAcknowledge(msg);
+ *     }
+ * }
+ * }</pre>
+ *
+ * <h2>End-to-End Tracing Example</h2>
+ * <pre>{@code
+ * // Service 1: HTTP endpoint that publishes to Pulsar
+ * // When using auto-instrumentation or OpenTelemetry SDK, the HTTP request
+ * // will have an active span context that automatically propagates to Pulsar
+ * @POST
+ * @Path("/publish")
+ * public Response publishMessage(String body) {
+ *     // Send message - trace context automatically injected!
+ *     producer.newMessage()
+ *         .value(body)
+ *         .send();
+ *
+ *     return Response.ok().build();
+ * }
+ *
+ * // Service 2: Consumer that processes messages
+ * Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ *     .topic("my-topic")
+ *     .subscriptionName("my-subscription")
+ *     .intercept(new OpenTelemetryConsumerInterceptor<>())
+ *     .subscribe();
+ *
+ * // Process messages - trace continues from HTTP request
+ * Message<String> msg = consumer.receive();
+ * // Trace context is automatically extracted from message properties
+ * processMessage(msg.getValue());
+ * consumer.acknowledge(msg);
+ *
+ * // The entire flow from HTTP request -> Producer -> Consumer is now traced!
+ * }</pre>
+ *
+ * <h2>Configuration</h2>
+ * OpenTelemetry can be configured via:
+ * <ul>
+ *   <li>Java Agent auto-instrumentation</li>
+ *   <li>Environment variables (OTEL_*)</li>
+ *   <li>System properties (otel.*)</li>
+ *   <li>Programmatic configuration</li>
+ * </ul>
+ *
+ * <h2>Trace Context Propagation</h2>
+ * Trace context is propagated using W3C TraceContext format via message 
properties:
+ * <ul>
+ *   <li><b>traceparent</b> - Contains trace ID, span ID, and trace flags</li>
+ *   <li><b>tracestate</b> - Contains vendor-specific trace information</li>
+ * </ul>
+ *
+ * @see OpenTelemetryProducerInterceptor
+ * @see OpenTelemetryConsumerInterceptor
+ * @see TracingContext
+ */
+package org.apache.pulsar.client.impl.tracing;
\ No newline at end of file
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index 8b031fbd38b..0ca83d50a96 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.client.impl;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -101,9 +103,9 @@ public class ConsumerBuilderImplTest {
     @Test
     public void testConsumerBuilderImpl() throws PulsarClientException {
         Consumer consumer = mock(Consumer.class);
-        when(consumerBuilderImpl.subscribeAsync())
-                .thenReturn(CompletableFuture.completedFuture(consumer));
-        assertNotNull(consumerBuilderImpl.topic(TOPIC_NAME).subscribe());
+        ConsumerBuilderImpl spyBuilder = spy(consumerBuilderImpl);
+        
doReturn(CompletableFuture.completedFuture(consumer)).when(spyBuilder).subscribeAsync();
+        assertNotNull(spyBuilder.topic(TOPIC_NAME).subscribe());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index bd96cf27aee..45e31e00b4c 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.testng.annotations.BeforeClass;
@@ -54,6 +55,7 @@ public class ProducerBuilderImplTest {
         client = mock(PulsarClientImpl.class);
         ConnectionPool connectionPool = mock(ConnectionPool.class);
         when(client.getCnxPool()).thenReturn(connectionPool);
+        when(client.getConfiguration()).thenReturn(new 
ClientConfigurationData());
         producerBuilderImpl = new ProducerBuilderImpl<>(client, Schema.BYTES);
         when(client.newProducer()).thenReturn(producerBuilderImpl);
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryTracingTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryTracingTest.java
new file mode 100644
index 00000000000..bbf92b9ffb1
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryTracingTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.tracing;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for OpenTelemetry tracing integration.
+ */
+public class OpenTelemetryTracingTest {
+
+    private InMemorySpanExporter spanExporter;
+    private OpenTelemetrySdk openTelemetry;
+    private Tracer tracer;
+    private TextMapPropagator propagator;
+
+    @BeforeClass
+    public void setup() {
+        spanExporter = InMemorySpanExporter.create();
+        SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
+                .addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
+                .build();
+
+        openTelemetry = OpenTelemetrySdk.builder()
+                .setTracerProvider(tracerProvider)
+                .build();
+
+        tracer = openTelemetry.getTracer("test-tracer");
+        propagator = openTelemetry.getPropagators().getTextMapPropagator();
+    }
+
+    @AfterClass
+    public void tearDown() {
+        if (openTelemetry != null) {
+            openTelemetry.close();
+        }
+    }
+
+    @Test
+    public void testCreateProducerSpan() {
+        spanExporter.reset();
+
+        String topic = "test-topic";
+        Span span = TracingContext.createProducerSpan(tracer, topic, null);
+
+        assertNotNull(span);
+        assertTrue(span.isRecording());
+        assertTrue(TracingContext.isValid(span));
+
+        TracingContext.endSpan(span);
+
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        assertEquals(spans.size(), 1);
+
+        SpanData spanData = spans.get(0);
+        assertEquals(spanData.getName(), "send " + topic);
+        assertEquals(spanData.getKind(), SpanKind.PRODUCER);
+        assertEquals(spanData.getAttributes().get(
+                
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), 
"pulsar");
+        assertEquals(spanData.getAttributes().get(
+                
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.name")),
 topic);
+        assertEquals(spanData.getAttributes().get(
+                
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.operation.name")),
 "send");
+    }
+
+    @Test
+    public void testCreateConsumerSpan() {
+        spanExporter.reset();
+
+        String topic = "test-topic";
+        String subscription = "test-sub";
+        Map<String, String> properties = new HashMap<>();
+        properties.put("test-key", "test-value");
+
+        Message<?> message = createTestMessage(properties);
+
+        Span span = TracingContext.createConsumerSpan(tracer, topic, 
subscription, message, propagator);
+
+        assertNotNull(span);
+        assertTrue(span.isRecording());
+        assertTrue(TracingContext.isValid(span));
+
+        TracingContext.endSpan(span);
+
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        assertEquals(spans.size(), 1);
+
+        SpanData spanData = spans.get(0);
+        assertEquals(spanData.getName(), "process " + topic);
+        assertEquals(spanData.getKind(), SpanKind.CONSUMER);
+        assertEquals(spanData.getAttributes().get(
+                
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), 
"pulsar");
+        assertEquals(spanData.getAttributes().get(
+                
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.name")),
 topic);
+        assertEquals(spanData.getAttributes().get(
+                
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.subscription.name")),
+                subscription);
+        assertEquals(spanData.getAttributes().get(
+                
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.operation.name")),
 "process");
+    }
+
+    @Test
+    public void testSpanWithException() {
+        spanExporter.reset();
+
+        String topic = "test-topic";
+        Span span = TracingContext.createProducerSpan(tracer, topic, null);
+
+        RuntimeException exception = new RuntimeException("Test exception");
+        TracingContext.endSpan(span, exception);
+
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        assertEquals(spans.size(), 1);
+
+        SpanData spanData = spans.get(0);
+        assertEquals(spanData.getStatus().getStatusCode(), 
io.opentelemetry.api.trace.StatusCode.ERROR);
+        assertFalse(spanData.getEvents().isEmpty());
+    }
+
+    @Test
+    public void testContextPropagation() {
+        spanExporter.reset();
+
+        // Create a parent span
+        Span parentSpan = tracer.spanBuilder("parent").startSpan();
+        try (Scope scope = parentSpan.makeCurrent()) {
+            // Create a producer span as child
+            String topic = "test-topic";
+            Span producerSpan = TracingContext.createProducerSpan(tracer, 
topic, Context.current());
+
+            assertNotNull(producerSpan);
+            assertTrue(TracingContext.isValid(producerSpan));
+
+            TracingContext.endSpan(producerSpan);
+        } finally {
+            parentSpan.end();
+        }
+
+        List<SpanData> spans = spanExporter.getFinishedSpanItems();
+        assertEquals(spans.size(), 2);
+
+        // Verify parent-child relationship
+        SpanData producerSpan = spans.get(0);
+        SpanData parentSpanData = spans.get(1);
+
+        assertEquals(producerSpan.getParentSpanId(), 
parentSpanData.getSpanId());
+    }
+
+
+    private Message<?> createTestMessage(Map<String, String> properties) {
+        // Create a simple MessageMetadata with properties
+        org.apache.pulsar.common.api.proto.MessageMetadata metadata =
+                new org.apache.pulsar.common.api.proto.MessageMetadata();
+
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            metadata.addProperty()
+                    .setKey(entry.getKey())
+                    .setValue(entry.getValue());
+        }
+
+        // Create a message with metadata
+        MessageImpl<?> message = MessageImpl.create(
+                metadata,
+                java.nio.ByteBuffer.wrap("test".getBytes()),
+                org.apache.pulsar.client.api.Schema.BYTES,
+                "test-topic"
+        );
+
+        // Set a MessageId on the message
+        message.setMessageId(new 
org.apache.pulsar.client.impl.MessageIdImpl(1L, 1L, -1));
+
+        return message;
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/TracingExampleTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/TracingExampleTest.java
new file mode 100644
index 00000000000..d50c45a1df5
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/TracingExampleTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.tracing;
+
+import org.testng.annotations.Test;
+
+/**
+ * Example test demonstrating OpenTelemetry tracing usage patterns.
+ * These are code examples for documentation purposes.
+ */
+public class TracingExampleTest {
+
+    /**
+     * Example 1: Basic producer with tracing.
+     */
+    @Test(enabled = false)
+    public void exampleBasicProducerTracing() throws Exception {
+        // Configure OpenTelemetry (or use auto-instrumentation)
+        // OpenTelemetry openTelemetry = ...;
+
+        // Create Pulsar client
+        // PulsarClient client = PulsarClient.builder()
+        //     .serviceUrl("pulsar://localhost:6650")
+        //     .build();
+
+        // Create producer with tracing interceptor
+        // Producer<String> producer = client.newProducer(Schema.STRING)
+        //     .topic("my-topic")
+        //     .intercept(new OpenTelemetryProducerInterceptor())
+        //     .create();
+
+        // Send message - trace context is automatically propagated
+        // producer.newMessage()
+        //     .value("Hello World")
+        //     .send();
+    }
+
+    /**
+     * Example 2: Producer with automatic tracing.
+     */
+    @Test(enabled = false)
+    public void exampleProducerWithAutomaticTracing() throws Exception {
+        // Create Pulsar client with tracing enabled
+        // PulsarClient client = PulsarClient.builder()
+        //     .serviceUrl("pulsar://localhost:6650")
+        //     .openTelemetry(openTelemetry, true)  // Enable automatic tracing
+        //     .build();
+
+        // Producer automatically has tracing enabled
+        // Producer<String> producer = client.newProducer(Schema.STRING)
+        //     .topic("my-topic")
+        //     .create();
+
+        // Send message - trace context is automatically injected
+        // producer.newMessage()
+        //     .value("Hello World")
+        //     .send();
+    }
+
+    /**
+     * Example 3: Basic consumer with tracing.
+     */
+    @Test(enabled = false)
+    public void exampleBasicConsumerTracing() throws Exception {
+        // Create Pulsar client
+        // PulsarClient client = PulsarClient.builder()
+        //     .serviceUrl("pulsar://localhost:6650")
+        //     .build();
+
+        // Create consumer with tracing interceptor
+        // Consumer<String> consumer = client.newConsumer(Schema.STRING)
+        //     .topic("my-topic")
+        //     .subscriptionName("my-subscription")
+        //     .intercept(new OpenTelemetryConsumerInterceptor<>())
+        //     .subscribe();
+
+        // Receive and process messages - trace context is automatically 
extracted
+        // while (true) {
+        //     Message<String> msg = consumer.receive();
+        //     try {
+        //         // Process message
+        //         System.out.println("Received: " + msg.getValue());
+        //         consumer.acknowledge(msg);
+        //     } catch (Exception e) {
+        //         consumer.negativeAcknowledge(msg);
+        //     }
+        // }
+    }
+
+    /**
+     * Example 4: End-to-end tracing from HTTP to Pulsar (automatic).
+     */
+    @Test(enabled = false)
+    public void exampleEndToEndTracing() throws Exception {
+        // ===== HTTP Service =====
+        // When the HTTP framework has OpenTelemetry instrumentation,
+        // the active span context automatically propagates to Pulsar
+
+        // Producer - trace context automatically injected from HTTP span
+        // producer.newMessage()
+        //     .value("Message from HTTP request")
+        //     .send();
+
+        // ===== Consumer Service =====
+        // In another service/application
+
+        // Consumer with tracing
+        // Consumer<String> consumer = client.newConsumer(Schema.STRING)
+        //     .topic("my-topic")
+        //     .subscriptionName("my-subscription")
+        //     .intercept(new OpenTelemetryConsumerInterceptor<>())
+        //     .subscribe();
+
+        // Process messages - trace continues from HTTP request
+        // Message<String> msg = consumer.receive();
+        // // Trace context is automatically extracted from message properties
+        // processMessage(msg.getValue());
+        // consumer.acknowledge(msg);
+
+        // The entire flow from HTTP request -> Producer -> Consumer is now 
traced!
+    }
+
+    /**
+     * Example 5: Custom span creation in message processing.
+     */
+    @Test(enabled = false)
+    public void exampleCustomSpanCreation() throws Exception {
+        // When you need to create custom spans during message processing
+
+        // Consumer with tracing
+        // Consumer<String> consumer = client.newConsumer(Schema.STRING)
+        //     .topic("my-topic")
+        //     .subscriptionName("my-subscription")
+        //     .intercept(new OpenTelemetryConsumerInterceptor<>())
+        //     .subscribe();
+
+        // Get tracer
+        // Tracer tracer = 
GlobalOpenTelemetry.get().getTracer("my-application");
+
+        // Process messages
+        // Message<String> msg = consumer.receive();
+
+        // The consumer interceptor already created a span, so we're in that 
context
+        // Create a child span for custom processing
+        // Span processingSpan = tracer.spanBuilder("process-message")
+        //     .setSpanKind(SpanKind.INTERNAL)
+        //     .startSpan();
+
+        // try (Scope scope = processingSpan.makeCurrent()) {
+        //     // Your processing logic here
+        //     // Any spans created here will be children of processingSpan
+        //     doSomeProcessing(msg.getValue());
+        //     processingSpan.setStatus(StatusCode.OK);
+        // } catch (Exception e) {
+        //     processingSpan.recordException(e);
+        //     processingSpan.setStatus(StatusCode.ERROR);
+        //     throw e;
+        // } finally {
+        //     processingSpan.end();
+        //     consumer.acknowledge(msg);
+        // }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index cb4c93f153f..40876e57df9 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.impl.ProducerBase;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
@@ -103,6 +104,7 @@ public class ContextImplTest {
 
         producer = mock(Producer.class);
         client = mock(PulsarClientImpl.class);
+        when(client.getConfiguration()).thenReturn(new 
ClientConfigurationData());
         ConnectionPool connectionPool = mock(ConnectionPool.class);
         when(client.getCnxPool()).thenReturn(connectionPool);
         when(client.newProducer()).thenAnswer(invocation -> new 
ProducerBuilderImpl(client, Schema.BYTES));

Reply via email to