lhotari commented on code in PR #24857:
URL: https://github.com/apache/pulsar/pull/24857#discussion_r2435622413


##########
pip/pip-446.md:
##########
@@ -0,0 +1,532 @@
+# PIP-446: Support Native OpenTelemetry Tracing in Pulsar Java Client
+
+# Background knowledge
+
+## OpenTelemetry
+
+OpenTelemetry is a vendor-neutral observability framework that provides APIs, 
SDKs, and tools for collecting distributed traces, metrics, and logs. It has 
become the industry standard for observability, adopted by major cloud 
providers and APM vendors.
+
+## Distributed Tracing
+
+Distributed tracing tracks requests as they flow through distributed systems. 
A **trace** represents the entire journey of a request, composed of multiple 
**spans**. Each span represents a single operation (e.g., sending a message, 
processing a request). Spans form parent-child relationships, creating a trace 
tree that visualizes request flow across services.
+
+## W3C Trace Context
+
+The W3C Trace Context specification defines a standard way to propagate trace 
context across service boundaries using HTTP headers or message properties:
+- `traceparent`: Contains trace ID, span ID, and trace flags
+- `tracestate`: Contains vendor-specific trace information
+
+## Pulsar Interceptors
+
+Pulsar client interceptors allow users to intercept and modify messages before 
sending (producer) or after receiving (consumer). They provide hooks for 
cross-cutting concerns like tracing, metrics, and security.
+
+## Cumulative Acknowledgment
+
+In Pulsar, cumulative acknowledgment allows consumers to acknowledge all 
messages up to a specific message ID in one operation. This is only available 
for Failover and Exclusive subscription types where message order is 
guaranteed. When a message is cumulatively acknowledged, all previous messages 
on that partition are implicitly acknowledged.
+
+# Motivation
+
+Currently, the Pulsar Java client lacks native support for distributed tracing 
with OpenTelemetry. While the OpenTelemetry Java Agent can automatically 
instrument Pulsar clients, there are several limitations:
+
+1. **Agent-only approach**: Users must use the Java Agent, which may not be 
suitable for all deployment scenarios (e.g., serverless, embedded applications)
+2. **Limited control**: Users cannot easily customize tracing behavior or 
selectively enable tracing for specific producers/consumers
+3. **Missing first-class support**: Other Apache projects (Kafka, Camel) 
provide native OpenTelemetry support, making Pulsar less competitive
+4. **Complex setup**: Users must understand agent configuration and classpath 
setup
+
+Native OpenTelemetry support would:
+- Provide a programmatic API for tracing configuration
+- Enable selective tracing without agent overhead
+- Improve observability in production systems
+- Align Pulsar with modern observability practices
+- Make it easier to diagnose performance issues and message flow
+
+# Goals
+
+## In Scope
+
+1. **Producer tracing**: Create spans for message send operations with 
automatic trace context injection
+2. **Consumer tracing**: Create spans for message receive/process operations 
with automatic trace context extraction
+3. **Trace context propagation**: Inject and extract W3C Trace Context via 
message properties
+4. **Programmatic API**: Enable tracing via `ClientBuilder` API
+5. **Interceptor-based design**: Implement using Pulsar's existing interceptor 
mechanism
+6. **Cumulative acknowledgment support**: Properly handle span lifecycle for 
cumulative acks
+7. **Multi-topic consumer support**: Track spans across multiple topic 
partitions
+8. **Agent compatibility**: Ensure compatibility with OpenTelemetry Java Agent
+9. **Semantic conventions**: Follow OpenTelemetry messaging semantic 
conventions
+10. **Zero overhead when disabled**: No performance impact when tracing is not 
enabled
+
+## Out of Scope
+
+1. **Broker-side tracing**: This PIP focuses on client-side tracing only
+2. **Metrics collection**: Only distributed tracing, not OpenTelemetry metrics
+3. **Log correlation**: Only tracing integration, not log integration
+4. **Custom propagators**: Only W3C Trace Context format supported initially
+5. **Transaction tracing**: Tracing for Pulsar transactions (future 
enhancement)
+6. **Schema registry tracing**: Tracing for schema operations
+7. **Admin API tracing**: Tracing for admin operations
+
+# High Level Design
+
+The implementation adds native OpenTelemetry tracing to the Pulsar Java client 
through:
+
+## 1. New Interfaces
+
+Two new interfaces enable attaching tracing spans to messages and message IDs:
+- `TraceableMessage`: Allows messages to carry OpenTelemetry spans
+- `TraceableMessageId`: Allows message IDs to carry OpenTelemetry spans
+
+## 2. Interceptors
+
+Two interceptors implement the tracing logic:
+- `OpenTelemetryProducerInterceptor`: Creates producer spans and injects trace 
context
+- `OpenTelemetryConsumerInterceptor`: Creates consumer spans and extracts 
trace context
+
+## 3. Configuration API
+
+Users can enable tracing through the `ClientBuilder`:
+```java
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://localhost:6650")
+    .openTelemetry(openTelemetry)
+    .enableTracing(true)
+    .build();
+```
+
+When enabled, the client automatically adds tracing interceptors to all 
producers and consumers.
+
+## 4. Trace Context Propagation
+
+The implementation uses W3C Trace Context format to propagate trace context:
+- **Producer**: Injects `traceparent` and `tracestate` into message properties
+- **Consumer**: Extracts trace context from message properties
+
+This enables end-to-end tracing across services that communicate via Pulsar.
+
+## 5. Span Lifecycle Management
+
+The implementation carefully manages span lifecycle:
+- **Producer spans**: Start on send, end on broker acknowledgment (or error)
+- **Consumer spans**: Start on receive, end on acknowledgment (or negative ack)
+- **Cumulative ack**: Ends all spans for messages up to the acknowledged 
position
+
+## 6. Multi-Topic Support
+
+For multi-topic consumers, the implementation maintains separate span maps per 
topic partition to correctly handle cumulative acknowledgments across multiple 
topics.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### 1. Traceable Interfaces
+
+**TraceableMessage interface** (`pulsar-client-api`):
+```java
+public interface TraceableMessage {
+    void setTracingSpan(io.opentelemetry.api.trace.Span span);
+    io.opentelemetry.api.trace.Span getTracingSpan();
+}
+```
+
+**TraceableMessageId interface** (`pulsar-client-api`):
+```java
+public interface TraceableMessageId {
+    void setTracingSpan(io.opentelemetry.api.trace.Span span);
+    io.opentelemetry.api.trace.Span getTracingSpan();
+}
+```
+
+Both `MessageImpl` and `MessageIdImpl` implement these interfaces by adding a 
transient field to store the span without affecting serialization.
+
+### 2. OpenTelemetryProducerInterceptor
+
+Located in `pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/`
+
+**Key methods**:
+- `beforeSend()`: Creates a producer span and injects trace context into 
message properties
+- `onSendAcknowledgement()`: Ends the span successfully and records message ID
+- `onPartitionsChange()`: No-op (not needed for producer)
+
+**Span creation**:
+- Uses `TracingContext.createProducerSpan()` to create a PRODUCER span
+- Span name: `send {topic}`
+- Attributes: `messaging.system`, `messaging.destination.name`, 
`messaging.operation.name`
+- Records `messaging.message.id` when broker acknowledges
+
+**Trace context injection**:
+- Uses OpenTelemetry `TextMapPropagator` to inject context into message 
properties
+- Injects `traceparent` and `tracestate` headers
+- Only injects if not already present (allows compatibility with Java Agent)
+
+### 3. OpenTelemetryConsumerInterceptor
+
+Located in `pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/`
+
+**Key methods**:
+- `beforeConsume()`: Extracts trace context and creates a consumer span
+- `onAcknowledge()`: Ends the span for individual ack with OK status
+- `onAcknowledgeCumulative()`: Ends all spans up to the acknowledged position 
with OK status
+- `onNegativeAcksSend()`: Ends the span with OK status and adds an event (not 
an error)
+- `onAckTimeoutSend()`: Ends the span with OK status and adds an event (not an 
error)
+
+**Span creation**:
+- Uses `TracingContext.createConsumerSpan()` to create a CONSUMER span
+- Span name: `process {topic}`
+- Attributes: `messaging.system`, `messaging.destination.name`, 
`messaging.operation.name`, `messaging.message.id`
+- Links to producer span via extracted trace context
+
+**Cumulative acknowledgment handling**:
+- Maintains `Map<String, ConcurrentSkipListMap<MessageIdAdv, Span>>` for 
Failover/Exclusive subscriptions
+- Outer map key: topic partition (from `TopicMessageId.getOwnerTopic()`)
+- Inner map: sorted message IDs to spans for efficient range operations
+- When cumulative ack occurs, removes and ends all spans up to the 
acknowledged position
+- Zero overhead for Shared/Key_Shared subscriptions (map is null)
+
+**Multi-topic support**:
+- Nested map structure handles messages from multiple topic partitions
+- Each topic partition maintains independent sorted span map
+- Cumulative ack only affects spans from the same topic partition
+
+**Acknowledgment type tracking**:
+- Every consumer span includes a `messaging.pulsar.acknowledgment.type` 
attribute indicating how it was completed:
+    - `"acknowledge"`: Normal individual acknowledgment
+    - `"cumulative_acknowledge"`: Cumulative acknowledgment
+    - `"negative_acknowledge"`: Message negatively acknowledged (will be 
redelivered)
+    - `"ack_timeout"`: Acknowledgment timeout (will be redelivered)
+- Negative ack and ack timeout end spans with **OK status** (not ERROR) 
because they are normal Pulsar message flow
+- This design separates messaging operations (which succeed) from application 
logic failures (which should be tracked in separate child spans)
+- When a message is redelivered, a new consumer span is created for the new 
delivery attempt
+- The attribute allows users to query and analyze retry patterns, timeout 
issues, and acknowledgment types in their tracing backend
+
+### 4. TracingContext Utility
+
+Provides helper methods for span creation and management:
+- `createProducerSpan()`: Creates a producer span with correct attributes
+- `createConsumerSpan()`: Creates a consumer span with trace context extraction
+- `endSpan()`: Safely ends a span
+- `endSpan(span, exception)`: Ends a span with error status
+- `isValid()`: Checks if a span is valid and recording
+
+### 5. TracingProducerBuilder
+
+Helper for manual trace context injection (advanced use cases):
+- `injectContext()`: Injects trace context into message properties
+- `extractFromHeaders()`: Extracts trace context from HTTP headers
+
+### 6. ClientBuilder Integration
+
+**New API methods** (`ClientBuilder`):
+```java
+ClientBuilder openTelemetry(OpenTelemetry openTelemetry);
+ClientBuilder enableTracing(boolean tracingEnabled);
+```
+
+**Implementation** (`ClientBuilderImpl`):
+- `openTelemetry()` stores `OpenTelemetry` instance in 
`ClientConfigurationData`
+- `enableTracing()` stores `tracingEnabled` flag in `ClientConfigurationData`
+- Passes configuration to `PulsarClientImpl`
+
+**Automatic interceptor addition** (`ConsumerBuilderImpl`, 
`ProducerBuilderImpl`):
+- Checks if tracing is enabled in client configuration
+- Automatically adds appropriate interceptor if enabled
+- User-provided interceptors are preserved and combined
+
+### 7. InstrumentProvider Enhancement
+
+Enhanced to provide OpenTelemetry instance:
+```java
+public OpenTelemetry getOpenTelemetry();
+```
+
+Falls back to `GlobalOpenTelemetry.get()` if not explicitly configured.
+
+### 8. Implementation Classes
+
+**Modified classes**:
+- `MessageImpl`: Implements `TraceableMessage`
+- `MessageIdImpl`: Implements `TraceableMessageId`
+- `TopicMessageImpl`: Delegates `TraceableMessage` methods to wrapped message
+- `TopicMessageIdImpl`: Delegates `TraceableMessageId` methods to wrapped 
message ID
+- `ConsumerBase`: Provides `getSubscriptionType()` for interceptors
+- `ConsumerBuilderImpl`: Auto-adds consumer interceptor when enabled
+- `ProducerBuilderImpl`: Auto-adds producer interceptor when enabled
+- `PulsarClientImpl`: Stores and provides OpenTelemetry configuration
+- `ClientConfigurationData`: Stores OpenTelemetry and enableTracing settings
+
+### 9. Span Attributes
+
+Following [OpenTelemetry messaging semantic 
conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/):
+
+**Producer spans**:
+- `messaging.system`: "pulsar"
+- `messaging.destination.name`: Topic name
+- `messaging.operation.name`: "send"
+- `messaging.message.id`: Message ID (added on ack)
+
+**Consumer spans**:
+- `messaging.system`: "pulsar"
+- `messaging.destination.name`: Topic name

Review Comment:
   Never mind encoding the subscription name in the destination name. It looks 
like there's a subscription name too. 
   
[`messaging.destination.subscription.name`](https://opentelemetry.io/docs/specs/semconv/registry/attributes/messaging/#messaging-destination-subscription-name)
 at 
https://opentelemetry.io/docs/specs/semconv/registry/attributes/messaging/#general-messaging-attributes
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to