[ 
https://issues.apache.org/jira/browse/KAFKA-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639930#comment-16639930
 ] 

John Roesler commented on KAFKA-7483:
-------------------------------------

Hi [~ckamal],

Thanks for the feature request!

This was the recent change (2.0) that allowed record headers to flow through 
the topology:
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API]
 
 * aka KAFKA-6850

It seems like this request is a straightforward extension of that work

It also doesn't seem like this would affect any public APIs, thus it would not 
require a KIP.

 

Offhand, it does seem like a simple, beneficial change... As you noted, we'd 
just call the extended serializer method in RecordCollectorImpl.

It would probably be good to search the Streams codebase for other invocations 
of `serialize` just to make sure the feature is airtight.

 

 

If you want to, you're welcome to send a PR with the proposed change!

 

Thanks,

-John

> Streams should allow headers to be passed to Serializer
> -------------------------------------------------------
>
>                 Key: KAFKA-7483
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7483
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Kamal Chandraprakash
>            Assignee: Kamal Chandraprakash
>            Priority: Major
>
> We are storing schema metadata for record key and value in the header. 
> Serializer, includes this metadata in the record header. While doing simple 
> record transformation (x transformed to y) in streams, the same header that 
> was passed from source, pushed to the sink topic. This leads to error while 
> reading the sink topic.
> We should call the overloaded `serialize(topic, headers, object)` method in 
> [RecordCollectorImpl|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L156]
>  which in-turn adds the correct metadata in the record header.
> With this sink topic reader have the option to read all the values for a 
> header key using `Headers#headers`  [or] only the overwritten value using 
> `Headers#lastHeader`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to