[ 
https://issues.apache.org/jira/browse/BEAM-4038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458325#comment-16458325
 ] 

Raghu Angadi commented on BEAM-4038:
------------------------------------

The interfaces with function call backs are problematic since we need to decide 
what context to provide for the function. E.g. 
{{KafkaPublishTimestampFunction}} provides {{elementTimestamp}}. Why? since we 
think that is probably what user wants to know along with KV. Recently we 
deprecated old timestamp functions for reader in order to support watermarks 
better. At least in the case of reader, there is no alternative to having a 
function callback since we need to set watermark/timestamp _before_ the user 
gets to see the record. 

 

Long story short, I think it is better to avoid another function to set 
headers. It will be similar story when more fields are added to 
{{KafkaRecord}}. In fact I think we should remove 
{{KafkaPublishTimestampFunction}}. 

How about adding {{KafkaIO.writeRecords()}} which is a 
{{PCollection<{ProducerRecord<K, V>, PDone>}}? This way user builds the 
ProducerRecord anyway they see fit. We can provide Avro coder for Kafka's 
{{ProducerRecord}}. We can handle older kafka versions by ignoring fields that 
are not present in old versions. We can add a coder very similar to 
{{KafkaRecordCoder}}.

This is more work than adding a function, but I think it improve flexibility of 
writer now and for future. 

 

 

> Support Kafka Headers in KafkaIO
> --------------------------------
>
>                 Key: BEAM-4038
>                 URL: https://issues.apache.org/jira/browse/BEAM-4038
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-kafka
>            Reporter: Geet Kumar
>            Assignee: Geet Kumar
>            Priority: Minor
>             Fix For: 2.5.0
>
>          Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to