[ 
https://issues.apache.org/jira/browse/KAFKA-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-7483:
----------------------------------------
    Description: 
We are storing schema metadata for record key and value in the header. 
Serializer, includes this metadata in the record header. While doing simple 
record transformation (x transformed to y) in streams, the same header that was 
passed from source, pushed to the sink topic. This leads to error while reading 
the sink topic.

We should call the overloaded `serialize(topic, headers, object)` method in  
[here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L156]
 which in-turn adds the correct metadata in the record header.

With this sink topic reader have the option to read all the values for a header 
key using `Headers#headers`  [or] only the overwritten value using 
`Headers#lastHeader`

  was:
We are storing schema metadata for record key and value in the header. 
Serializer, includes this metadata in the record header. While doing simple 
record transformation (x transformed to y) in streams, the same header that was 
passed from source, pushed to the sink topic. This leads to error while reading 
the sink topic.

We should call the overloaded `serialize(topic, headers, object)` method in  
org.apache.kafka.streams.processor.internals.RecordCollectorImpl#L156, #L157 
which in-turn adds the correct metadata in the record header.

With this sink topic reader have the option to read all the values for a header 
key using `Headers#headers`  [or] only the overwritten value using 
`Headers#lastHeader`


> Streams should allow headers to be passed to Serializer
> -------------------------------------------------------
>
>                 Key: KAFKA-7483
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7483
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Kamal Chandraprakash
>            Assignee: Kamal Chandraprakash
>            Priority: Major
>
> We are storing schema metadata for record key and value in the header. 
> Serializer, includes this metadata in the record header. While doing simple 
> record transformation (x transformed to y) in streams, the same header that 
> was passed from source, pushed to the sink topic. This leads to error while 
> reading the sink topic.
> We should call the overloaded `serialize(topic, headers, object)` method in  
> [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L156]
>  which in-turn adds the correct metadata in the record header.
> With this sink topic reader have the option to read all the values for a 
> header key using `Headers#headers`  [or] only the overwritten value using 
> `Headers#lastHeader`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to