[ 
https://issues.apache.org/jira/browse/FLINK-29361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Hao closed FLINK-29361.
---------------------------
    Resolution: Not A Problem

> How to set headers with the new Flink KafkaSink
> -----------------------------------------------
>
>                 Key: FLINK-29361
>                 URL: https://issues.apache.org/jira/browse/FLINK-29361
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>            Reporter: Xin Hao
>            Priority: Minor
>
> I'm using Flink 1.15.2, when I try to migrate to the new KafkaSink, it seems 
> that it's not possible to add Kafka record headers.
> I think we should add this feature or document it if we already have it.
>  
> Below code is what we can do with FlinkKafkaProducer and ProducerRecord
> {code:java}
> public class SomeKafkaSerializationSchema<T extends SpecificRecordBase>  
> implements KafkaSerializationSchema<T> {
>   ...
>   @Override  public ProducerRecord<byte[], byte[]> serialize(T t, Long ts) {
>     ...    
>     var record = ProducerRecord<byte[], byte[]>(topic, some_bytes_a);
>     record.headers().add("id", some_bytes_b);    
>     return record;  
>   }}
> ...
> var producer = new FlinkKafkaProducer<>(
>   topic,  
>   new SomeKafkaSerializationSchema<>(...),  
>   producerProps,  
>   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
> );
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to