Hi Jiabao,

Thanks for reply.

Currently I am using Flink 1.16.1 and I am not able to find any HeaderProvider 
setter method in class KafkaRecordSerializationSchemaBuilder.
Although on github I found this support here: 
https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
But this doesn't seem released yet. Can you please point me towards correct 
Flink version?

Also, any help on question 1 regarding Schema Registry?

Regards,
Kirti Dhar

-----Original Message-----
From: Jiabao Sun <jiabao....@xtransfer.cn> 
Sent: 01 February 2024 13:29
To: user@flink.apache.org
Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers

Hi Kirti,

Kafka Sink supports sending messages with headers.
You should implement a HeaderProvider to extract headers from input element.


KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers(brokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic("topic-name")
                .setValueSerializationSchema(new SimpleStringSchema())
                .setHeaderProvider(new HeaderProvider<String>() {
                    @Override
                    public Headers getHeaders(String input) {
                        //TODO: implements it
                        return null;
                    }
                })
                .build()
        )
        .build();

Best,
Jiabao


On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
> Hi Mates,
> 
> I have below queries regarding Flink Kafka Sink.
> 
> 
>   1.  Does Kafka Sink support schema registry? If yes, is there any 
> documentations to configure the same?
>   2.  Does Kafka Sink support sending  messages (ProducerRecord) with headers?
> 
> 
> Regards,
> Kirti Dhar
> 
> 

Reply via email to