Philipp Schirmer created KAFKA-20462:
----------------------------------------

             Summary: ErrorHandlerContext does not provide access to original 
headers
                 Key: KAFKA-20462
                 URL: https://issues.apache.org/jira/browse/KAFKA-20462
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 4.2.0
            Reporter: Philipp Schirmer


With Kafka 4.2.0 the ProcessingExceptionHandler was introduced which can be 
used to send records to a dead letter queue. The ErrorHandlerContext provides 
access to the raw source key and value of a message and the headers of a record 
in order to send a record to the dead letter queue that is serialized in the 
same way as the source record. However, the headers are not the original 
headers before deserialization. If a Deserializer (and maybe even part of the 
topology) modifies the headers, the headers are different from the source 
record and thus the dlq record cannot be deserialized, if the deserializer 
depends on the headers.

We noticed this problem when testing the dlq with our LargeMessageDeserializer 
[https://github.com/bakdata/kafka-large-message-serde/blob/60b3864c5e6f1b897bd29c700c4ed0ffdbb486b0/large-message-serde/src/main/java/com/bakdata/kafka/LargeMessageDeserializer.java#L80.]
 We remove the serde-specific headers after deserialization because they are 
only meaningful to the deserializer and we do not want to pass on the headers 
for all down stream records that are send based on this source record (Kafka 
Streams by default passes on headers). Therefore, we think that removing 
serde-specific headers in the Deserializer is good practice.

In order to retain this behavior, the ErrorHandlerContext should provide access 
to the original headers before any deserialization takes place.

As a workaround, this would work for us 
[https://github.com/bakdata/kafka-large-message-serde/pull/179] but we would 
need to retain the headers in the Kafka message.

As a side-note, the headers can be accessed by both the ErrorHandlerContext and 
the Record passed in the ProcessingExceptionHandler. The Record headers can 
very well be those that have been modified by the Deserializer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to