Hi,

Yes, you are right the schema in the forGeneric is the readerSchema and
at the same time the schema that Flink will be working with in the
pipeline. It will be the schema used to serialize and deserialize
records between different TaskManagers. Between the Flink TaskManagers
that schema plays the role of both the reader and the writer schema.

The way that Avro works is that you must provide both the writer and the
reader schema. Otherwise it simply does not work. If you provide just
the writer schema, the reader schema is assumed to be the same. Without
the writer schema it is not possible to deserialize Avro. See extract
from Avro spec: [1]

    Binary encoded Avro data does not include type information or field
    names. The benefit is that the serialized data is small, but as a
    result a schema must always be used in order to read Avro data
    correctly. The best way to ensure that the schema is structurally
    identical to the one used to write the data is to use the exact same
    schema.

    Therefore, files or systems that store Avro data should always
    include the writer's schema for that data. Avro-based remote
    procedure call (RPC) systems must also guarantee that remote
    recipients of data have a copy of the schema used to write that
    data. In general, it is advisable that any reader of Avro data
    should use a schema that is the same (as defined more fully in
    Parsing Canonical Form for Schemas
    
<https://avro.apache.org/docs/1.10.2/spec.html#Parsing+Canonical+Form+for+Schemas>)
    as the schema that was used to write the data in order to
    deserialize it correctly. Deserializing data into a newer schema is
    accomplished by specifying an additional schema, the results of
    which are described in Schema Resolution
    <https://avro.apache.org/docs/1.10.2/spec.html#Schema+Resolution>.

Now to the question why do we need to pass a schema as a parameter for
the forGeneric method. For the DeserializationSchema, so when reading
e.g. from Kafka it is indeed used as the reader schema. However as I
said earlier it is also used when serializing between Flink's
TaskManagers. In this scenario it is used both as the writer and as the
reader schema. The design here is that we do not want to query the
schema registry from every TaskManager. You can say that we read
multiple different versions from e.g. Kafka and normalize it to that
provided schema that's required across the pipeline.

The reason why you don't need to provide any schema in the
KafkaAvroDeserializer is that it is ever used in a single parallel
instance and it is not sent over a network again. So basically there you
use the writer schema retrieved from schema registry as the reader schema.

I hope this answers your questions.

Best,

Dawid

[1] https://avro.apache.org/docs/1.10.2/spec.html

On 09/07/2021 03:09, M Singh wrote:
> Hi:
>
> I am trying to read avro encoded messages from Kafka with schema
> registered in schema registry.
>
> I am using the class
> (https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html>)
> using the method:
>
> |static ConfluentRegistryAvroDeserializationSchema
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html>
> <org.apache.avro.generic.GenericRecord>forGeneric(...)|       
>
>
> The arguments for this method are:
>
> |forGeneric
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html#forGeneric-org.apache.avro.Schema-java.lang.String->(org.apache.avro.Schema
>  schema, String
> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>  url)|
>
> Creates |ConfluentRegistryAvroDeserializationSchema|
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html>
>  that
> produces |GenericRecord| using the provided reader schema and looks up
> the writer schema in the Confluent Schema Registry.
>
> As I understand, the schema is the reader schema and the url is the
> schema registry url used to retrieve writer schema.  
>
> I have a few questions:
>
> 1. Why do we need the writer schema from the registry ?  Is it to
> validate that the reader schema is same as writer schema ?
> 2. Since the url of the schema registry is provided, why do we need to
> provide the reader schema ? Can the schema be retrieved at run time
> from the avro message metadata dynamically and then cache it (as shown
> in the example snippet from confluent below) ? 
>
> The confluent consumer example
> (https://docs.confluent.io/5.0.0/schema-registry/docs/serializer-formatter.html
> <https://docs.confluent.io/5.0.0/schema-registry/docs/serializer-formatter.html>)
> has the following example snippet where the schema.registry.url is
> provided to the consumer and the message can be converted to generic
> record using the KafkaAvroDeserializer without the need to pass the
> reader schema.
>
> <snip>
>
> Properties props = new Properties();
>
> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
>
>
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "io.confluent.kafka.serializers.KafkaAvroDeserializer");
> props.put("schema.registry.url", "http://localhost:8081";);
>
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>
> String topic = "topic1";
> final Consumer<String, GenericRecord> consumer = new
> KafkaConsumer<String, String>(props);
> consumer.subscribe(Arrays.asList(topic));
>
> try {
>   while (true) {
>     ConsumerRecords<String, String> records = consumer.poll(100);
>     for (ConsumerRecord<String, String> record : records) {
>       System.out.printf("offset = %d, key = %s, value = %s \n",
> record.offset(), record.key(), record.value());
>     }
>   }
> } finally {
>   consumer.close();
> }
>
>
> <snip>
>
> Please let me know if I have missed anything and there is a way to
> read avro encoded messages from kafka with schema registry without
> requiring reader schema.
>
> Thanks
>
>
>
>
>

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to