[ https://issues.apache.org/jira/browse/KAFKA-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16640073#comment-16640073 ]
Guozhang Wang commented on KAFKA-7483: -------------------------------------- [~ckamal] I think this is a good one to add and should be straight-forward as well, compatibility wise I do not seen any obvious issues as we have default impl of `serialize(String topic, Headers headers, T data)` to ignore the header so for users who do not serialize the headers anyways they should not see any surprises. Would you like to submit a PR for this, along with some unit test to make sure the logic is added properly? > Streams should allow headers to be passed to Serializer > ------------------------------------------------------- > > Key: KAFKA-7483 > URL: https://issues.apache.org/jira/browse/KAFKA-7483 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Kamal Chandraprakash > Assignee: Kamal Chandraprakash > Priority: Major > > We are storing schema metadata for record key and value in the header. > Serializer, includes this metadata in the record header. While doing simple > record transformation (x transformed to y) in streams, the same header that > was passed from source, pushed to the sink topic. This leads to error while > reading the sink topic. > We should call the overloaded `serialize(topic, headers, object)` method in > [RecordCollectorImpl|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L156] > which in-turn adds the correct metadata in the record header. > With this sink topic reader have the option to read all the values for a > header key using `Headers#headers` [or] only the overwritten value using > `Headers#lastHeader` -- This message was sent by Atlassian JIRA (v7.6.3#76005)