Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema

2020-04-21 Thread Anil K
Thanks Fabian,

I ended up using something like below.

public class GenericSerializer implements
KafkaSerializationSchema {

  private final SerializationSchema valueSerializer;
  private final String topic;

  public GenericSerializer(String topic, Schema schemaValue, String
schemaRegistryUrl) {
this.valueSerializer =
ConfluentRegistryAvroSerializationSchema.forGeneric(topic,
schemaValue, schemaRegistryUrl);
this.topic = topic;
  }

  @Override
  public ProducerRecord serialize(GenericRecord
element, Long timestamp) {
byte[] value = valueSerializer.serialize(element);
return new ProducerRecord<>(topic, value);
  }
}

Then used a new object of GenericSerializer in the FlinkKafkaProducer

FlinkKafkaProducer producer =
new FlinkKafkaProducer<>(topic, new GenericSerializer(topic,
schema, schemaRegistryUrl), kafkaConfig, Semantic.AT_LEAST_ONCE);

Thanks , Anil.


On Tue, Apr 21, 2020 at 3:34 AM Fabian Hueske  wrote:

> Hi Anil,
>
> Here's a pointer to Flink's end-2-end test that's checking the integration
> with schema registry [1].
> It was recently updated so I hope it works the same way in Flink 1.9.
>
> Best,
> Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
>
> Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K  >:
>
>> Hi,
>>
>> What is the best way to use Confluent SchemaRegistry with
>> FlinkKafkaProducer?
>>
>> What I have right now is as follows.
>>
>> SerializationSchema serializationSchema =
>> ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, 
>> schemaRegistryUrl);
>>
>> FlinkKafkaProducer kafkaProducer =
>> new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig);
>> outputStream.addSink(producer);
>>
>> FlinkKafkaProducer with is SerializationSchema now depricated.
>>
>> I am using flink 1.9.
>>
>> How to use FlinkKafkaProducer with KafkaSerializationSchema with 
>> ConfluentSchemaRegsitry?
>>
>> Is there some reference/documentation i could use?
>>
>> Thanks , Anil.
>>
>>


Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema

2020-04-21 Thread Fabian Hueske
Thanks for sharing your solution Anil!

Cheers, Fabian

Am Di., 21. Apr. 2020 um 09:35 Uhr schrieb Anil K :

> Thanks Fabian,
>
> I ended up using something like below.
>
> public class GenericSerializer implements 
> KafkaSerializationSchema {
>
>   private final SerializationSchema valueSerializer;
>   private final String topic;
>
>   public GenericSerializer(String topic, Schema schemaValue, String 
> schemaRegistryUrl) {
> this.valueSerializer =
> ConfluentRegistryAvroSerializationSchema.forGeneric(topic, 
> schemaValue, schemaRegistryUrl);
> this.topic = topic;
>   }
>
>   @Override
>   public ProducerRecord serialize(GenericRecord element, Long 
> timestamp) {
> byte[] value = valueSerializer.serialize(element);
> return new ProducerRecord<>(topic, value);
>   }
> }
>
> Then used a new object of GenericSerializer in the FlinkKafkaProducer
>
> FlinkKafkaProducer producer =
> new FlinkKafkaProducer<>(topic, new GenericSerializer(topic, schema, 
> schemaRegistryUrl), kafkaConfig, Semantic.AT_LEAST_ONCE);
>
> Thanks , Anil.
>
>
> On Tue, Apr 21, 2020 at 3:34 AM Fabian Hueske  wrote:
>
>> Hi Anil,
>>
>> Here's a pointer to Flink's end-2-end test that's checking the
>> integration with schema registry [1].
>> It was recently updated so I hope it works the same way in Flink 1.9.
>>
>> Best,
>> Fabian
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
>>
>> Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K <
>> sendto.ani...@gmail.com>:
>>
>>> Hi,
>>>
>>> What is the best way to use Confluent SchemaRegistry with
>>> FlinkKafkaProducer?
>>>
>>> What I have right now is as follows.
>>>
>>> SerializationSchema serializationSchema =
>>> ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, 
>>> schemaRegistryUrl);
>>>
>>> FlinkKafkaProducer kafkaProducer =
>>> new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig);
>>> outputStream.addSink(producer);
>>>
>>> FlinkKafkaProducer with is SerializationSchema now depricated.
>>>
>>> I am using flink 1.9.
>>>
>>> How to use FlinkKafkaProducer with KafkaSerializationSchema with 
>>> ConfluentSchemaRegsitry?
>>>
>>> Is there some reference/documentation i could use?
>>>
>>> Thanks , Anil.
>>>
>>>


Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema

2020-04-20 Thread Fabian Hueske
Hi Anil,

Here's a pointer to Flink's end-2-end test that's checking the integration
with schema registry [1].
It was recently updated so I hope it works the same way in Flink 1.9.

Best,
Fabian

[1]
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java

Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K :

> Hi,
>
> What is the best way to use Confluent SchemaRegistry with
> FlinkKafkaProducer?
>
> What I have right now is as follows.
>
> SerializationSchema serializationSchema =
> ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, 
> schemaRegistryUrl);
>
> FlinkKafkaProducer kafkaProducer =
> new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig);
> outputStream.addSink(producer);
>
> FlinkKafkaProducer with is SerializationSchema now depricated.
>
> I am using flink 1.9.
>
> How to use FlinkKafkaProducer with KafkaSerializationSchema with 
> ConfluentSchemaRegsitry?
>
> Is there some reference/documentation i could use?
>
> Thanks , Anil.
>
>