Hi:
I am trying to read avro encoded messages from Kafka with schema registered in 
schema registry.
I am using the class 
(https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html)
 using the method:

| static ConfluentRegistryAvroDeserializationSchema
<org.apache.avro.generic.GenericRecord>forGeneric(...) | 
 |


The arguments for this method are:
forGeneric(org.apache.avro.Schema schema, String url)Creates 
ConfluentRegistryAvroDeserializationSchema that produces GenericRecord using 
the provided reader schema and looks up the writer schema in the Confluent 
Schema Registry.
As I understand, the schema is the reader schema and the url is the schema 
registry url used to retrieve writer schema.  
I have a few questions:
1. Why do we need the writer schema from the registry ?  Is it to validate that 
the reader schema is same as writer schema ?2. Since the url of the schema 
registry is provided, why do we need to provide the reader schema ? Can the 
schema be retrieved at run time from the avro message metadata dynamically and 
then cache it (as shown in the example snippet from confluent below) ? 
The confluent consumer example 
(https://docs.confluent.io/5.0.0/schema-registry/docs/serializer-formatter.html)
 has the following example snippet where the schema.registry.url is provided to 
the consumer and the message can be converted to generic record using the 
KafkaAvroDeserializer without the need to pass the reader schema.
<snip>
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
"io.confluent.kafka.serializers.KafkaAvroDeserializer");props.put("schema.registry.url",
 "http://localhost:8081";);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "topic1";final Consumer<String, GenericRecord> consumer = new 
KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));
try {  while (true) {    ConsumerRecords<String, String> records = 
consumer.poll(100);    for (ConsumerRecord<String, String> record : records) {  
    System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), 
record.key(), record.value());    }  }} finally {  consumer.close();}

<snip>
Please let me know if I have missed anything and there is a way to read avro 
encoded messages from kafka with schema registry without requiring reader 
schema.
Thanks




Reply via email to