Hi Team,

This is regarding Flink Kafka Sink. We have a usecase where we have headers and 
both key and value as Avro Schema.

Below is the expectation in terms of intuitiveness for avro kafka key and value:

KafkaSink.<ProducerRecord<Key,Value>>builder()
                        .setBootstrapServers(cloudkafkaBrokerAPI)
                        .setRecordSerializer(
                                KafkaRecordSerializationSchema.builder()
                                .setKeySerializationSchema(
                                    ConfluentRegistryAvroSerializationSchema
                                .forSpecific(
                                    key.class,
                                        "Key",
                                        cloudSchemaRegistryURL))
                                .setValueSerializationSchema(
                                                
ConfluentRegistryAvroSerializationSchema
                                                        .forSpecific(
                                                            Value.class,"val", 
cloudSchemaRegistryURL))
                                        .setTopic(outputTopic)
                                        .build())
                        .build();

What I understood currently it does not accept key and value both as avro 
schemas as part of kafka sink. It only accepts sink.

First I tried to use the deprecated Flink Kafka Producer by implementing 
KafkaSerializationSchema and supplying properties of avro ser and der via :
cloudKafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());
cloudKafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());


The problem here is I am able to run this example but the schema that gets 
stored in confluent schema registry is:
{
    "subject": "ddp_out-key",
    "version": 1,
    "id": 1,
    "schema": "\"bytes\""
}

Instead of full schema it has just recognized the whole as bytes. So I am 
looking for a solution without kafka sink to make it work as of now and is 
there feature request part of roadmap for adding support
To kafka sink itself for producer record as that would be ideal. The previous 
operator can send the producer record with key,val and headers and then it can 
be forwarded ahead.

-Jay
GEHC


Reply via email to