Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema
Thanks Fabian, I ended up using something like below. public class GenericSerializer implements KafkaSerializationSchema { private final SerializationSchema valueSerializer; private final String topic; public GenericSerializer(String topic, Schema schemaValue, String schemaRegistryUrl) { this.valueSerializer = ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schemaValue, schemaRegistryUrl); this.topic = topic; } @Override public ProducerRecord serialize(GenericRecord element, Long timestamp) { byte[] value = valueSerializer.serialize(element); return new ProducerRecord<>(topic, value); } } Then used a new object of GenericSerializer in the FlinkKafkaProducer FlinkKafkaProducer producer = new FlinkKafkaProducer<>(topic, new GenericSerializer(topic, schema, schemaRegistryUrl), kafkaConfig, Semantic.AT_LEAST_ONCE); Thanks , Anil. On Tue, Apr 21, 2020 at 3:34 AM Fabian Hueske wrote: > Hi Anil, > > Here's a pointer to Flink's end-2-end test that's checking the integration > with schema registry [1]. > It was recently updated so I hope it works the same way in Flink 1.9. > > Best, > Fabian > > [1] > https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java > > Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K >: > >> Hi, >> >> What is the best way to use Confluent SchemaRegistry with >> FlinkKafkaProducer? >> >> What I have right now is as follows. >> >> SerializationSchema serializationSchema = >> ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, >> schemaRegistryUrl); >> >> FlinkKafkaProducer kafkaProducer = >> new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig); >> outputStream.addSink(producer); >> >> FlinkKafkaProducer with is SerializationSchema now depricated. >> >> I am using flink 1.9. >> >> How to use FlinkKafkaProducer with KafkaSerializationSchema with >> ConfluentSchemaRegsitry? >> >> Is there some reference/documentation i could use? >> >> Thanks , Anil. >> >>
Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema
Thanks for sharing your solution Anil! Cheers, Fabian Am Di., 21. Apr. 2020 um 09:35 Uhr schrieb Anil K : > Thanks Fabian, > > I ended up using something like below. > > public class GenericSerializer implements > KafkaSerializationSchema { > > private final SerializationSchema valueSerializer; > private final String topic; > > public GenericSerializer(String topic, Schema schemaValue, String > schemaRegistryUrl) { > this.valueSerializer = > ConfluentRegistryAvroSerializationSchema.forGeneric(topic, > schemaValue, schemaRegistryUrl); > this.topic = topic; > } > > @Override > public ProducerRecord serialize(GenericRecord element, Long > timestamp) { > byte[] value = valueSerializer.serialize(element); > return new ProducerRecord<>(topic, value); > } > } > > Then used a new object of GenericSerializer in the FlinkKafkaProducer > > FlinkKafkaProducer producer = > new FlinkKafkaProducer<>(topic, new GenericSerializer(topic, schema, > schemaRegistryUrl), kafkaConfig, Semantic.AT_LEAST_ONCE); > > Thanks , Anil. > > > On Tue, Apr 21, 2020 at 3:34 AM Fabian Hueske wrote: > >> Hi Anil, >> >> Here's a pointer to Flink's end-2-end test that's checking the >> integration with schema registry [1]. >> It was recently updated so I hope it works the same way in Flink 1.9. >> >> Best, >> Fabian >> >> [1] >> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java >> >> Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K < >> sendto.ani...@gmail.com>: >> >>> Hi, >>> >>> What is the best way to use Confluent SchemaRegistry with >>> FlinkKafkaProducer? >>> >>> What I have right now is as follows. >>> >>> SerializationSchema serializationSchema = >>> ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, >>> schemaRegistryUrl); >>> >>> FlinkKafkaProducer kafkaProducer = >>> new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig); >>> outputStream.addSink(producer); >>> >>> FlinkKafkaProducer with is SerializationSchema now depricated. >>> >>> I am using flink 1.9. >>> >>> How to use FlinkKafkaProducer with KafkaSerializationSchema with >>> ConfluentSchemaRegsitry? >>> >>> Is there some reference/documentation i could use? >>> >>> Thanks , Anil. >>> >>>
Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema
Hi Anil, Here's a pointer to Flink's end-2-end test that's checking the integration with schema registry [1]. It was recently updated so I hope it works the same way in Flink 1.9. Best, Fabian [1] https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K : > Hi, > > What is the best way to use Confluent SchemaRegistry with > FlinkKafkaProducer? > > What I have right now is as follows. > > SerializationSchema serializationSchema = > ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, > schemaRegistryUrl); > > FlinkKafkaProducer kafkaProducer = > new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig); > outputStream.addSink(producer); > > FlinkKafkaProducer with is SerializationSchema now depricated. > > I am using flink 1.9. > > How to use FlinkKafkaProducer with KafkaSerializationSchema with > ConfluentSchemaRegsitry? > > Is there some reference/documentation i could use? > > Thanks , Anil. > >