[ https://issues.apache.org/jira/browse/KAFKA-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639930#comment-16639930 ]
John Roesler commented on KAFKA-7483: ------------------------------------- Hi [~ckamal], Thanks for the feature request! This was the recent change (2.0) that allowed record headers to flow through the topology: * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API] * aka KAFKA-6850 It seems like this request is a straightforward extension of that work It also doesn't seem like this would affect any public APIs, thus it would not require a KIP. Offhand, it does seem like a simple, beneficial change... As you noted, we'd just call the extended serializer method in RecordCollectorImpl. It would probably be good to search the Streams codebase for other invocations of `serialize` just to make sure the feature is airtight. If you want to, you're welcome to send a PR with the proposed change! Thanks, -John > Streams should allow headers to be passed to Serializer > ------------------------------------------------------- > > Key: KAFKA-7483 > URL: https://issues.apache.org/jira/browse/KAFKA-7483 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Kamal Chandraprakash > Assignee: Kamal Chandraprakash > Priority: Major > > We are storing schema metadata for record key and value in the header. > Serializer, includes this metadata in the record header. While doing simple > record transformation (x transformed to y) in streams, the same header that > was passed from source, pushed to the sink topic. This leads to error while > reading the sink topic. > We should call the overloaded `serialize(topic, headers, object)` method in > [RecordCollectorImpl|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L156] > which in-turn adds the correct metadata in the record header. > With this sink topic reader have the option to read all the values for a > header key using `Headers#headers` [or] only the overwritten value using > `Headers#lastHeader` -- This message was sent by Atlassian JIRA (v7.6.3#76005)