Dear Kafka Development Community,

I propose enhancing the Kafka ProducerInterceptor interface, specifically
to make Record Headers available in the onAcknowledgement method. I would
appreciate your feedback on this proposal. If the feedback is positive, I
will follow up with a detailed discussion on implementing this feature.

*Current State*

At present, the topic, partition, offset, and timestamp are available in
the onAcknowledgement method. However, headers are not accessible.

*Why This Feature Is Important*

Two primary use cases highlight the importance of making headers available
in the onAcknowledgement method:

1. *Latency Measurement*

Latency measurement is crucial for understanding the time taken for
messages to travel from the producer to Kafka and back to the producer as
an acknowledgment. The current setup does not allow for precise measurement
of the producer-side latency (a) in the following scenario:

```
producer send -> (a) -> Kafka -> (b) -> acknowledge
```

   - If using CreateTime, the calculation is: `now - message.timestamp =
(a) + (b)`

   - If using LogAppendTime, the calculation is: `now - message.timestamp =
(b)`

By making headers available in onAcknowledgement, we can include a
timestamp in the header when the message is sent. This allows us to
calculate the producer-side latency (a) as `message.timestamp -
CreateTimeFromHeader` when using LogAppendTime.

2. *Tracing Completeness*

In distributed systems, tracing is essential for tracking the flow of
messages and understanding system behavior. The trace ID is typically
stored in the message headers. Having access to headers in the
onAcknowledgement method would enable us to add spans indicating when the
message arrives at Kafka and when it is acknowledged by the client. This
would significantly enhance tracing completeness and accuracy.

*Why Not Use Callback on Send*

While it is possible to use the Callback mechanism provided by the send
method, this approach has limitations:

- *Injectability*: In some cases, such as with Debezium and Flink Kafka
sink, the Callback is not injectable, making it impractical to use this
method for the aforementioned purposes.

I believe that adding Record Headers to the onAcknowledgement method in
ProducerInterceptor would significantly enhance Kafka's capabilities in
latency measurement and tracing, and also open up new opportunities for
further improvements and innovations.

I look forward to your feedback on this proposal. If the community is in
favor, I will provide a detailed follow-up on how this feature can be
implemented. While KIP-512 was considered, we can discuss other options in
the next follow-up.

Thank you for your consideration.

Best regards,
Rich Chen

Reply via email to