This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new af6c2dd13c5 [feat][pip] PIP-446: Support Native OpenTelemetry Tracing
in Pulsar Java Client (#24857)
af6c2dd13c5 is described below
commit af6c2dd13c577fe6c20c577396e4c9a70465b345
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Jan 2 22:16:12 2026 +0800
[feat][pip] PIP-446: Support Native OpenTelemetry Tracing in Pulsar Java
Client (#24857)
---
pip/pip-446.md | 524 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 524 insertions(+)
diff --git a/pip/pip-446.md b/pip/pip-446.md
new file mode 100644
index 00000000000..fc21970b429
--- /dev/null
+++ b/pip/pip-446.md
@@ -0,0 +1,524 @@
+# PIP-446: Support Native OpenTelemetry Tracing in Pulsar Java Client
+
+# Background knowledge
+
+## OpenTelemetry
+
+OpenTelemetry is a vendor-neutral observability framework that provides APIs,
SDKs, and tools for collecting distributed traces, metrics, and logs. It has
become the industry standard for observability, adopted by major cloud
providers and APM vendors.
+
+## Distributed Tracing
+
+Distributed tracing tracks requests as they flow through distributed systems.
A **trace** represents the entire journey of a request, composed of multiple
**spans**. Each span represents a single operation (e.g., sending a message,
processing a request). Spans form parent-child relationships, creating a trace
tree that visualizes request flow across services.
+
+## W3C Trace Context
+
+The W3C Trace Context specification defines a standard way to propagate trace
context across service boundaries using HTTP headers or message properties:
+- `traceparent`: Contains trace ID, span ID, and trace flags
+- `tracestate`: Contains vendor-specific trace information
+
+## Pulsar Interceptors
+
+Pulsar client interceptors allow users to intercept and modify messages before
sending (producer) or after receiving (consumer). They provide hooks for
cross-cutting concerns like tracing, metrics, and security.
+
+## Cumulative Acknowledgment
+
+In Pulsar, cumulative acknowledgment allows consumers to acknowledge all
messages up to a specific message ID in one operation. This is only available
for Failover and Exclusive subscription types where message order is
guaranteed. When a message is cumulatively acknowledged, all previous messages
on that partition are implicitly acknowledged.
+
+# Motivation
+
+Currently, the Pulsar Java client lacks native support for distributed tracing
with OpenTelemetry. While the OpenTelemetry Java Agent can automatically
instrument Pulsar clients, there are several limitations:
+
+1. **Agent-only approach**: Users must use the Java Agent, which may not be
suitable for all deployment scenarios (e.g., serverless, embedded applications)
+2. **Limited control**: Users cannot easily customize tracing behavior or
selectively enable tracing for specific producers/consumers
+3. **Missing first-class support**: Other Apache projects (Kafka, Camel)
provide native OpenTelemetry support, making Pulsar less competitive
+4. **Complex setup**: Users must understand agent configuration and classpath
setup
+
+Native OpenTelemetry support would:
+- Provide a programmatic API for tracing configuration
+- Enable selective tracing without agent overhead
+- Improve observability in production systems
+- Align Pulsar with modern observability practices
+- Make it easier to diagnose performance issues and message flow
+
+# Goals
+
+## In Scope
+
+1. **Producer tracing**: Create spans for message send operations with
automatic trace context injection
+2. **Consumer tracing**: Create spans for message receive/process operations
with automatic trace context extraction
+3. **Trace context propagation**: Inject and extract W3C Trace Context via
message properties
+4. **Programmatic API**: Enable tracing via `ClientBuilder` API
+5. **Interceptor-based design**: Implement using Pulsar's existing interceptor
mechanism
+6. **Cumulative acknowledgment support**: Properly handle span lifecycle for
cumulative acks
+7. **Multi-topic consumer support**: Track spans across multiple topic
partitions
+8. **Agent compatibility**: Ensure compatibility with OpenTelemetry Java Agent
+9. **Semantic conventions**: Follow OpenTelemetry messaging semantic
conventions
+10. **Zero overhead when disabled**: No performance impact when tracing is not
enabled
+
+## Out of Scope
+
+1. **Broker-side tracing**: This PIP focuses on client-side tracing only
+2. **Metrics collection**: Only distributed tracing, not OpenTelemetry metrics
+3. **Log correlation**: Only tracing integration, not log integration
+4. **Custom propagators**: Only W3C Trace Context format supported initially
+5. **Transaction tracing**: Tracing for Pulsar transactions (future
enhancement)
+6. **Schema registry tracing**: Tracing for schema operations
+7. **Admin API tracing**: Tracing for admin operations
+
+# High Level Design
+
+The implementation adds native OpenTelemetry tracing to the Pulsar Java client
through:
+
+## 1. New Interfaces
+
+Two new interfaces enable attaching tracing spans to messages and message IDs:
+- `TraceableMessage`: Allows messages to carry OpenTelemetry spans
+- `TraceableMessageId`: Allows message IDs to carry OpenTelemetry spans
+
+## 2. Interceptors
+
+Two interceptors implement the tracing logic:
+- `OpenTelemetryProducerInterceptor`: Creates producer spans and injects trace
context
+- `OpenTelemetryConsumerInterceptor`: Creates consumer spans and extracts
trace context
+
+## 3. Configuration API
+
+Users can enable tracing through the `ClientBuilder`:
+```java
+PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+```
+
+When enabled, the client automatically adds tracing interceptors to all
producers and consumers.
+
+## 4. Trace Context Propagation
+
+The implementation uses W3C Trace Context format to propagate trace context:
+- **Producer**: Injects `traceparent` and `tracestate` into message properties
+- **Consumer**: Extracts trace context from message properties
+
+This enables end-to-end tracing across services that communicate via Pulsar.
+
+## 5. Span Lifecycle Management
+
+The implementation carefully manages span lifecycle:
+- **Producer spans**: Start on send, end on broker acknowledgment (or error)
+- **Consumer spans**: Start on receive, end on acknowledgment (or negative ack)
+- **Cumulative ack**: Ends all spans for messages up to the acknowledged
position
+
+## 6. Multi-Topic Support
+
+For multi-topic consumers, the implementation maintains separate span maps per
topic partition to correctly handle cumulative acknowledgments across multiple
topics.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### 1. Traceable Interfaces
+
+**TraceableMessage interface** (`pulsar-client-api`):
+```java
+public interface TraceableMessage {
+ void setTracingSpan(io.opentelemetry.api.trace.Span span);
+ io.opentelemetry.api.trace.Span getTracingSpan();
+}
+```
+
+**TraceableMessageId interface** (`pulsar-client-api`):
+```java
+public interface TraceableMessageId {
+ void setTracingSpan(io.opentelemetry.api.trace.Span span);
+ io.opentelemetry.api.trace.Span getTracingSpan();
+}
+```
+
+Both `MessageImpl` and `MessageIdImpl` implement these interfaces by adding a
transient field to store the span without affecting serialization.
+
+### 2. OpenTelemetryProducerInterceptor
+
+Located in `pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/`
+
+**Key methods**:
+- `beforeSend()`: Creates a producer span and injects trace context into
message properties
+- `onSendAcknowledgement()`: Ends the span successfully and records message ID
+- `onPartitionsChange()`: No-op (not needed for producer)
+
+**Span creation**:
+- Uses `TracingContext.createProducerSpan()` to create a PRODUCER span
+- Span name: `send {topic}`
+- Attributes: `messaging.system`, `messaging.destination.name`,
`messaging.operation.name`
+- Records `messaging.message.id` when broker acknowledges
+
+**Trace context injection**:
+- Uses OpenTelemetry `TextMapPropagator` to inject context into message
properties
+- Injects `traceparent` and `tracestate` headers
+- Only injects if not already present (allows compatibility with Java Agent)
+
+### 3. OpenTelemetryConsumerInterceptor
+
+Located in `pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/`
+
+**Key methods**:
+- `beforeConsume()`: Extracts trace context and creates a consumer span
+- `onAcknowledge()`: Ends the span for individual ack with OK status
+- `onAcknowledgeCumulative()`: Ends all spans up to the acknowledged position
with OK status
+- `onNegativeAcksSend()`: Ends the span with OK status and adds an event (not
an error)
+- `onAckTimeoutSend()`: Ends the span with OK status and adds an event (not an
error)
+
+**Span creation**:
+- Uses `TracingContext.createConsumerSpan()` to create a CONSUMER span
+- Span name: `process {topic}`
+- Attributes: `messaging.system`, `messaging.destination.name`,
`messaging.operation.name`, `messaging.message.id`
+- Links to producer span via extracted trace context
+
+**Cumulative acknowledgment handling**:
+- Maintains `Map<String, ConcurrentSkipListMap<MessageIdAdv, Span>>` for
Failover/Exclusive subscriptions
+- Outer map key: topic partition (from `TopicMessageId.getOwnerTopic()`)
+- Inner map: sorted message IDs to spans for efficient range operations
+- When cumulative ack occurs, removes and ends all spans up to the
acknowledged position
+- Zero overhead for Shared/Key_Shared subscriptions (map is null)
+
+**Multi-topic support**:
+- Nested map structure handles messages from multiple topic partitions
+- Each topic partition maintains independent sorted span map
+- Cumulative ack only affects spans from the same topic partition
+
+**Acknowledgment type tracking**:
+- Every consumer span includes a `messaging.pulsar.acknowledgment.type`
attribute indicating how it was completed:
+ - `"acknowledge"`: Normal individual acknowledgment
+ - `"cumulative_acknowledge"`: Cumulative acknowledgment
+ - `"negative_acknowledge"`: Message negatively acknowledged (will be
redelivered)
+ - `"ack_timeout"`: Acknowledgment timeout (will be redelivered)
+- Negative ack and ack timeout end spans with **OK status** (not ERROR)
because they are normal Pulsar message flow
+- This design separates messaging operations (which succeed) from application
logic failures (which should be tracked in separate child spans)
+- When a message is redelivered, a new consumer span is created for the new
delivery attempt
+- The attribute allows users to query and analyze retry patterns, timeout
issues, and acknowledgment types in their tracing backend
+
+### 4. TracingContext Utility
+
+Provides helper methods for span creation and management:
+- `createProducerSpan()`: Creates a producer span with correct attributes
+- `createConsumerSpan()`: Creates a consumer span with trace context extraction
+- `endSpan()`: Safely ends a span
+- `endSpan(span, exception)`: Ends a span with error status
+- `isValid()`: Checks if a span is valid and recording
+
+### 5. TracingProducerBuilder
+
+Helper for manual trace context injection (advanced use cases):
+- `injectContext()`: Injects trace context into message properties
+- `extractFromHeaders()`: Extracts trace context from HTTP headers
+
+### 6. ClientBuilder Integration
+
+**New API methods** (`ClientBuilder`):
+```java
+ClientBuilder enableTracing(boolean tracingEnabled);
+```
+
+**Implementation** (`ClientBuilderImpl`):
+- `enableTracing()` stores `tracingEnabled` flag in `ClientConfigurationData`
+- Passes configuration to `PulsarClientImpl`
+
+**Automatic interceptor addition** (`ConsumerBuilderImpl`,
`ProducerBuilderImpl`):
+- Checks if tracing is enabled in client configuration
+- Automatically adds appropriate interceptor if enabled
+- User-provided interceptors are preserved and combined
+
+### 7. InstrumentProvider Enhancement
+
+Enhanced to provide OpenTelemetry instance:
+```java
+public OpenTelemetry getOpenTelemetry();
+```
+
+Falls back to `GlobalOpenTelemetry.get()` if not explicitly configured.
+
+### 8. Implementation Classes
+
+**Modified classes**:
+- `MessageImpl`: Implements `TraceableMessage`
+- `MessageIdImpl`: Implements `TraceableMessageId`
+- `TopicMessageImpl`: Delegates `TraceableMessage` methods to wrapped message
+- `TopicMessageIdImpl`: Delegates `TraceableMessageId` methods to wrapped
message ID
+- `ConsumerBase`: Provides `getSubscriptionType()` for interceptors
+- `ConsumerBuilderImpl`: Auto-adds consumer interceptor when enabled
+- `ProducerBuilderImpl`: Auto-adds producer interceptor when enabled
+- `PulsarClientImpl`: Stores and provides OpenTelemetry configuration
+- `ClientConfigurationData`: Stores OpenTelemetry and enableTracing settings
+
+### 9. Span Attributes
+
+Following [OpenTelemetry messaging semantic
conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/):
+
+**Producer spans**:
+- `messaging.system`: "pulsar"
+- `messaging.destination.name`: Topic name
+- `messaging.operation.name`: "send"
+- `messaging.message.id`: Message ID (added on ack)
+
+**Consumer spans**:
+- `messaging.system`: "pulsar"
+- `messaging.destination.name`: Topic name
+- `messaging.destination.subscription.name`: Subscription name
+- `messaging.operation.name`: "process"
+- `messaging.message.id`: Message ID
+- `messaging.pulsar.acknowledgment.type`: Custom attribute indicating how the
message was acknowledged:
+ - `"acknowledge"`: Individual acknowledgment
+ - `"cumulative_acknowledge"`: Cumulative acknowledgment
+ - `"negative_acknowledge"`: Negative acknowledgment (message will be
redelivered)
+ - `"ack_timeout"`: Acknowledgment timeout (message will be redelivered)
+
+**Rationale for `messaging.pulsar.acknowledgment.type` attribute**:
+- Provides visibility into message acknowledgment patterns
+- Enables querying for retry scenarios (negative ack, timeout)
+- Helps identify timeout configuration issues
+- Allows analysis of cumulative vs. individual acknowledgment usage
+- Uses attribute (not event) for better queryability in tracing backends
+
+## Public-facing Changes
+
+### Public API
+
+**New interfaces** (`org.apache.pulsar.client.api`):
+
+```java
+public interface TraceableMessage {
+ void setTracingSpan(io.opentelemetry.api.trace.Span span);
+ io.opentelemetry.api.trace.Span getTracingSpan();
+}
+
+public interface TraceableMessageId {
+ void setTracingSpan(io.opentelemetry.api.trace.Span span);
+ io.opentelemetry.api.trace.Span getTracingSpan();
+}
+```
+
+**ClientBuilder new methods**:
+
+```java
+/**
+ * Enable or disable automatic tracing.
+ * When enabled, uses GlobalOpenTelemetry.get() if no OpenTelemetry instance
is set.
+ * Tracing interceptors are automatically added to all producers and consumers.
+ */
+ClientBuilder enableTracing(boolean tracingEnabled);
+```
+
+**New public classes** (`org.apache.pulsar.client.impl.tracing`):
+- `OpenTelemetryProducerInterceptor`: Producer interceptor for tracing
+- `OpenTelemetryConsumerInterceptor<T>`: Consumer interceptor for tracing
+- `TracingContext`: Utility methods for span creation
+- `TracingProducerBuilder`: Helper for manual trace context injection
+
+**Modified classes**:
+- `Message` interface: Now extends `TraceableMessage` (via implementations)
+- `MessageId` interface: Now extends `TraceableMessageId` (via implementations)
+
+### Binary protocol
+
+No changes to binary protocol. Trace context is propagated via existing
message properties mechanism.
+
+### Configuration
+
+**New ClientBuilder options**:
+- `enableTracing(boolean)`: Enable automatic tracing
+
+**Example configuration**:
+
+```java
+// Option 1: Explicit OpenTelemetry instance with tracing enabled
+OpenTelemetry otel = OpenTelemetrySdk.builder()
+ .setTracerProvider(tracerProvider)
+ .build();
+
+PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .openTelemetry(otel)
+ .enableTracing(true)
+ .build();
+
+// Option 2: Use GlobalOpenTelemetry
+PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .enableTracing(true) // Uses GlobalOpenTelemetry
+ .build();
+
+// Option 3: Manual interceptor (advanced)
+Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic("my-topic")
+ .intercept(new OpenTelemetryProducerInterceptor())
+ .create();
+```
+
+### CLI
+
+No CLI changes. This is a client library feature.
+
+### Metrics
+
+This PIP focuses on distributed tracing, not metrics. No new metrics are added.
+
+# Monitoring
+
+Users can monitor tracing effectiveness through their OpenTelemetry backend
(e.g., Jaeger, Zipkin, Grafana Tempo):
+
+## Key Monitoring Aspects
+
+1. **Span creation rate**: Monitor the rate of producer and consumer spans to
ensure tracing is active
+2. **Trace completeness**: Verify traces show complete paths from producer to
consumer
+3. **Error spans**: Monitor spans with ERROR status to identify failures
+4. **Span duration**: Analyze span durations to identify performance
bottlenecks:
+ - Long producer spans may indicate slow broker acknowledgments
+ - Long consumer spans may indicate slow message processing
+
+## Recommended Dashboards
+
+1. **Producer Performance**:
+ - Track `send` span durations by topic
+ - Alert on high error rates
+ - Monitor throughput (spans per second)
+
+2. **Consumer Performance**:
+ - Track `process` span durations by topic
+ - Monitor acknowledgment latency
+ - Alert on negative acknowledgment rates
+
+3. **End-to-End Latency**:
+ - Visualize complete traces from producer to consumer
+ - Identify bottlenecks in the message flow
+ - Track latency percentiles (p50, p95, p99)
+
+# Security Considerations
+
+This feature does not introduce new security concerns:
+
+1. **Trace context in properties**: Trace context (`traceparent`,
`tracestate`) is stored in message properties, which are part of the existing
message structure. No additional authentication or authorization is needed.
+
+2. **No sensitive data**: Trace context only contains trace IDs, span IDs, and
trace flags. No user data or sensitive information is included.
+
+3. **OpenTelemetry instance**: The `OpenTelemetry` instance is provided by the
application and follows the same security model as other client configuration.
+
+4. **Multi-tenancy**: Tracing respects existing Pulsar multi-tenancy
boundaries. Trace context is scoped to individual messages and does not leak
across tenants.
+
+5. **No new endpoints**: This feature does not add new HTTP endpoints or
protocol commands.
+
+# Backward & Forward Compatibility
+
+## Upgrade
+
+**Fully backward compatible**. No breaking changes:
+
+1. **Default behavior unchanged**: Tracing is disabled by default. Existing
applications work without modification.
+2. **Serialization compatible**: New interfaces use `transient` fields that
don't affect serialization.
+3. **Message format unchanged**: Trace context uses existing message
properties mechanism.
+4. **Interceptor compatible**: Works alongside existing user interceptors.
+
+**Upgrade steps**:
+1. Upgrade client library to version containing this feature
+2. Optionally enable tracing via `ClientBuilder.enableTracing(true)`
+3. Configure OpenTelemetry SDK and exporters if using programmatic
configuration
+
+## Downgrade / Rollback
+
+**Fully compatible with downgrade**:
+
+1. **Message compatibility**: Messages sent with trace context can be received
by older clients (they ignore unknown properties)
+2. **No schema changes**: No changes to message schema or protocol
+3. **Graceful degradation**: Older clients simply don't create spans but can
still process messages
+
+**Rollback steps**:
+1. Downgrade client library to previous version
+2. Tracing will stop, but message flow continues normally
+3. Existing traces may be incomplete if some clients are downgraded
+
+## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations
+
+**No impact on geo-replication**:
+
+1. **Trace context preserved**: Message properties (including trace context)
are replicated across clusters
+2. **Mixed versions**: Clusters can run different client versions. Trace
context propagates through older brokers without issues.
+3. **No broker changes**: This is a client-only feature. Broker version
doesn't matter.
+
+**Considerations**:
+- Traces may span multiple clusters, providing visibility into geo-replication
latency
+- If some clusters use tracing and others don't, traces will have gaps but
remain functional
+- Trace context continues across cluster boundaries via message properties
+
+# Alternatives
+
+## Alternative 1: OpenTelemetry Java Agent Only
+
+**Approach**: Only support tracing via OpenTelemetry Java Agent automatic
instrumentation.
+
+**Rejected because**:
+- Requires agent deployment, not suitable for all environments
+- No programmatic control over tracing behavior
+- Harder to debug and customize
+- Not aligned with other Apache projects that provide native support
+
+## Alternative 2: Custom Tracing API
+
+**Approach**: Design a custom Pulsar-specific tracing API instead of using
OpenTelemetry.
+
+**Rejected because**:
+- OpenTelemetry is the industry standard
+- Custom API would require additional exporters and integrations
+- Would not work with existing OpenTelemetry ecosystem
+- Increases maintenance burden
+
+## Alternative 3: Span Storage in Message Properties
+
+**Approach**: Store spans directly in message properties instead of using
separate `TraceableMessage` interface.
+
+**Rejected because**:
+- Spans are not serializable
+- Would require serializing span context for every message (overhead)
+- Less clean API design
+- Harder to integrate with cumulative acknowledgment
+
+## Alternative 4: Per-Message Acknowledgment Only
+
+**Approach**: Only support individual acknowledgment, not cumulative
acknowledgment.
+
+**Rejected because**:
+- Cumulative acknowledgment is a key Pulsar feature
+- Would leave spans unclosed until timeout
+- Poor user experience for Failover/Exclusive subscriptions
+- Incomplete tracing for common use cases
+
+# General Notes
+
+## Performance Considerations
+
+The implementation is designed for minimal overhead:
+
+1. **Zero overhead when disabled**: No performance impact when tracing is not
enabled
+2. **Lazy initialization**: Span maps only created for Failover/Exclusive
subscriptions
+3. **Efficient data structures**: `ConcurrentSkipListMap` for O(log n) range
operations
+4. **Transient fields**: Spans not serialized with messages
+5. **Batching**: OpenTelemetry SDK batches span exports by default
+
+## Testing
+
+Comprehensive testing includes:
+
+1. **Unit tests**: `OpenTelemetryTracingTest` verifies span creation,
attributes, and context propagation
+2. **Example tests**: `TracingExampleTest` demonstrates usage patterns
+3. **Integration tests**: Manual testing with Jaeger backend
+4. **Compatibility tests**: Verified with OpenTelemetry Java Agent
+
+## Documentation
+
+User documentation provided in:
+- `pulsar-client/TRACING.md`: Comprehensive tracing guide
+- Javadoc comments on all public APIs
+- Code examples in test classes
+
+# Links
+
+* Mailing List discussion thread:
https://lists.apache.org/thread/61f5zxsc8ydcbjllwbp2x6y5g4f1xj5m
+* Mailing List voting thread:
https://lists.apache.org/thread/yv455tyyyrpr3j6o1lzx8k9bq4d0ko5m