[ https://issues.apache.org/jira/browse/KAFKA-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kamal Chandraprakash updated KAFKA-7483: ---------------------------------------- Description: We are storing schema metadata for record key and value in the header. Serializer, includes this metadata in the record header. While doing simple record transformation (x transformed to y) in streams, the same header that was passed from source, pushed to the sink topic. This leads to error while reading the sink topic. We should call the overloaded `serialize(topic, headers, object)` method in [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L156] which in-turn adds the correct metadata in the record header. With this sink topic reader have the option to read all the values for a header key using `Headers#headers` [or] only the overwritten value using `Headers#lastHeader` was: We are storing schema metadata for record key and value in the header. Serializer, includes this metadata in the record header. While doing simple record transformation (x transformed to y) in streams, the same header that was passed from source, pushed to the sink topic. This leads to error while reading the sink topic. We should call the overloaded `serialize(topic, headers, object)` method in org.apache.kafka.streams.processor.internals.RecordCollectorImpl#L156, #L157 which in-turn adds the correct metadata in the record header. With this sink topic reader have the option to read all the values for a header key using `Headers#headers` [or] only the overwritten value using `Headers#lastHeader` > Streams should allow headers to be passed to Serializer > ------------------------------------------------------- > > Key: KAFKA-7483 > URL: https://issues.apache.org/jira/browse/KAFKA-7483 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Kamal Chandraprakash > Assignee: Kamal Chandraprakash > Priority: Major > > We are storing schema metadata for record key and value in the header. > Serializer, includes this metadata in the record header. While doing simple > record transformation (x transformed to y) in streams, the same header that > was passed from source, pushed to the sink topic. This leads to error while > reading the sink topic. > We should call the overloaded `serialize(topic, headers, object)` method in > [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L156] > which in-turn adds the correct metadata in the record header. > With this sink topic reader have the option to read all the values for a > header key using `Headers#headers` [or] only the overwritten value using > `Headers#lastHeader` -- This message was sent by Atlassian JIRA (v7.6.3#76005)