[ https://issues.apache.org/jira/browse/FLINK-29361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xin Hao closed FLINK-29361. --------------------------- Resolution: Not A Problem > How to set headers with the new Flink KafkaSink > ----------------------------------------------- > > Key: FLINK-29361 > URL: https://issues.apache.org/jira/browse/FLINK-29361 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Reporter: Xin Hao > Priority: Minor > > I'm using Flink 1.15.2, when I try to migrate to the new KafkaSink, it seems > that it's not possible to add Kafka record headers. > I think we should add this feature or document it if we already have it. > > Below code is what we can do with FlinkKafkaProducer and ProducerRecord > {code:java} > public class SomeKafkaSerializationSchema<T extends SpecificRecordBase> > implements KafkaSerializationSchema<T> { > ... > @Override public ProducerRecord<byte[], byte[]> serialize(T t, Long ts) { > ... > var record = ProducerRecord<byte[], byte[]>(topic, some_bytes_a); > record.headers().add("id", some_bytes_b); > return record; > }} > ... > var producer = new FlinkKafkaProducer<>( > topic, > new SomeKafkaSerializationSchema<>(...), > producerProps, > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE > ); > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)