Thanks for sharing your solution Anil!

Cheers, Fabian

Am Di., 21. Apr. 2020 um 09:35 Uhr schrieb Anil K <sendto.ani...@gmail.com>:

> Thanks Fabian,
>
> I ended up using something like below.
>
> public class GenericSerializer implements 
> KafkaSerializationSchema<GenericRecord> {
>
>   private final SerializationSchema<GenericRecord> valueSerializer;
>   private final String topic;
>
>   public GenericSerializer(String topic, Schema schemaValue, String 
> schemaRegistryUrl) {
>     this.valueSerializer =
>         ConfluentRegistryAvroSerializationSchema.forGeneric(topic, 
> schemaValue, schemaRegistryUrl);
>     this.topic = topic;
>   }
>
>   @Override
>   public ProducerRecord<byte[], byte[]> serialize(GenericRecord element, Long 
> timestamp) {
>     byte[] value = valueSerializer.serialize(element);
>     return new ProducerRecord<>(topic, value);
>   }
> }
>
> Then used a new object of GenericSerializer in the FlinkKafkaProducer
>
> FlinkKafkaProducer<GenericRecord> producer =
>     new FlinkKafkaProducer<>(topic, new GenericSerializer(topic, schema, 
> schemaRegistryUrl), kafkaConfig, Semantic.AT_LEAST_ONCE);
>
> Thanks , Anil.
>
>
> On Tue, Apr 21, 2020 at 3:34 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Anil,
>>
>> Here's a pointer to Flink's end-2-end test that's checking the
>> integration with schema registry [1].
>> It was recently updated so I hope it works the same way in Flink 1.9.
>>
>> Best,
>> Fabian
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
>>
>> Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K <
>> sendto.ani...@gmail.com>:
>>
>>> Hi,
>>>
>>> What is the best way to use Confluent SchemaRegistry with
>>> FlinkKafkaProducer?
>>>
>>> What I have right now is as follows.
>>>
>>> SerializationSchema<GenericRecord> serializationSchema =
>>>     ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, 
>>> schemaRegistryUrl);
>>>
>>> FlinkKafkaProducer<GenericRecord> kafkaProducer =
>>>     new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig);
>>> outputStream.addSink(producer);
>>>
>>> FlinkKafkaProducer with is SerializationSchema now depricated.
>>>
>>> I am using flink 1.9.
>>>
>>> How to use FlinkKafkaProducer with KafkaSerializationSchema with 
>>> ConfluentSchemaRegsitry?
>>>
>>> Is there some reference/documentation i could use?
>>>
>>> Thanks , Anil.
>>>
>>>

Reply via email to