Hi: I am trying to read avro encoded messages from Kafka with schema registered in schema registry. I am using the class (https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html) using the method:
| static ConfluentRegistryAvroDeserializationSchema <org.apache.avro.generic.GenericRecord>forGeneric(...) | | The arguments for this method are: forGeneric(org.apache.avro.Schema schema, String url)Creates ConfluentRegistryAvroDeserializationSchema that produces GenericRecord using the provided reader schema and looks up the writer schema in the Confluent Schema Registry. As I understand, the schema is the reader schema and the url is the schema registry url used to retrieve writer schema. I have a few questions: 1. Why do we need the writer schema from the registry ? Is it to validate that the reader schema is same as writer schema ?2. Since the url of the schema registry is provided, why do we need to provide the reader schema ? Can the schema be retrieved at run time from the avro message metadata dynamically and then cache it (as shown in the example snippet from confluent below) ? The confluent consumer example (https://docs.confluent.io/5.0.0/schema-registry/docs/serializer-formatter.html) has the following example snippet where the schema.registry.url is provided to the consumer and the message can be converted to generic record using the KafkaAvroDeserializer without the need to pass the reader schema. <snip> Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");props.put("schema.registry.url", "http://localhost:8081"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); String topic = "topic1";final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); } }} finally { consumer.close();} <snip> Please let me know if I have missed anything and there is a way to read avro encoded messages from kafka with schema registry without requiring reader schema. Thanks