Re: Apache Flink - Reading Avro messages from Kafka with schema in schema registry

2021-07-15 Thread M Singh
 Thanks so much Dawid for your time and explanations.  Mans
On Thursday, July 15, 2021, 10:09:23 AM EDT, Dawid Wysakowicz 
 wrote:  
 
  
Ad 1/2/4 Please refer to Avro's documentation on how do the reader and writer 
schemas work e.g. here[1] and here[2]. That's purely Avro's thing.
 
Ad.3 Theoretically yes. The problem is the communication between TaskManagers. 
We need to serialize the Avro records somehow. The chosen approach is to use a 
single version of the schema in a pipeline and thus do not query the schema 
registry from each node/operator. The channels between TaskManagers are 
transient and thus we do not need to work with different versions of the 
schema. 
 
 
Another approach could be to store the schema along with records. We do not 
want to do that for performance reasons. That's why we use the provided schema 
distributed along with operators during scheduling for both writing and reading.
 
Ad. 5 Yes, between TMs we use it as both reader and writer schema. However in 
the source (usually Kafka), which is usually a permanent storage you most 
probably want to store alongside records with different schemas (because the 
schema may evolve over time) thus you need schema registry.
 
Best,
 
Dawid
 
 
[1]https://avro.apache.org/docs/1.10.2/spec.html#Data+Serialization+and+Deserialization
 
[2] https://avro.apache.org/docs/1.10.2/spec.html#Schema+Resolution
 
 On 15/07/2021 15:48, M Singh wrote:
  
 
 Hello Dawid: 
  Thanks for your answers and references. 
  I do have a few questions: 
  1. Is there any scenario where the reader and writer schema should differ ?  
  2. How is the mismatch b/w the two schemas (one passed as argument and other 
retrieved from schema registry) resolved at run time ? 
  3. As mentioned - "If you provide just the writer schema, the reader schema 
is assumed to be the same." - Is it possible in Flink to just use schema 
registry to retrieve the schema which can be used for both reading/writing  ? 
  4. Regarding "You can say that we read multiple different versions from e.g. 
Kafka and normalize it to that provided schema that's required across the 
pipeline." - Does this mean that if the reader (passed as argument) and writer 
(retrieved from the registry) schemas differ then Flink will normalize the 
differences ? If so, are there any guidelines as to how the fields are 
normalized ? 
  5. Regarding: "he forGeneric is the readerSchema and at the same time the 
schema that Flink will be working with in the pipeline. It will be the schema 
used to serialize and deserialize records between different TaskManagers" - Is 
the reader schema (passed as argument) used for reading and writing b/w 
Taskmanagers, the what role does schema from the registry play ?  Does it have 
to do with the "normalization" you've mentioned ? 
  Thanks again for your time. 
  Mans 
  On Tuesday, July 13, 2021, 10:22:32 AM EDT, Dawid Wysakowicz 
 wrote:  
  
 
Hi,
 
Yes, you are right the schema in the forGeneric is the readerSchema and at the 
same time the schema that Flink will be working with in the pipeline. It will 
be the schema used to serialize and deserialize records between different 
TaskManagers. Between the Flink TaskManagers that schema plays the role of both 
the reader and the writer schema.
 
The way that Avro works is that you must provide both the writer and the reader 
schema. Otherwise it simply does not work. If you provide just the writer 
schema, the reader schema is assumed to be the same. Without the writer schema 
it is not possible to deserialize Avro. See extract from Avro spec: [1]
 
 
Binary encoded Avro data does not include type information or field names. The 
benefit is that the serialized data is small, but as a result a schema must 
always be used in order to read Avro data correctly. The best way to ensure 
that the schema is structurally identical to the one used to write the data is 
to use the exact same schema.
   
Therefore, files or systems that store Avro data should always include the 
writer's schema for that data. Avro-based remote procedure call (RPC) systems 
must also guarantee that remote recipients of data have a copy of the schema 
used to write that data. In general, it is advisable that any reader of Avro 
data should use a schema that is the same (as defined more fully in Parsing 
Canonical Form for Schemas) as the schema that was used to write the data in 
order to deserialize it correctly. Deserializing data into a newer schema is 
accomplished by specifying an additional schema, the results of which are 
described in Schema Resolution.
 
 
Now to the question why do we need to pass a schema as a parameter for the 
forGeneric method. For the DeserializationSchema, so when reading e.g. from 
Kafka it is indeed used as the reader schema. However as I said earlier it is 
also used when serializing between Flink's TaskManagers. In this scenario it is 
used both as the writer and as the reader schema. The design here is that 

Re: Apache Flink - Reading Avro messages from Kafka with schema in schema registry

2021-07-15 Thread Dawid Wysakowicz
Ad 1/2/4 Please refer to Avro's documentation on how do the reader and
writer schemas work e.g. here[1] and here[2]. That's purely Avro's thing.

Ad.3 Theoretically yes. The problem is the communication between
TaskManagers. We need to serialize the Avro records somehow. The chosen
approach is to use a single version of the schema in a pipeline and thus
do not query the schema registry from each node/operator. The channels
between TaskManagers are transient and thus we do not need to work with
different versions of the schema.

Another approach could be to store the schema along with records. We do
not want to do that for performance reasons. That's why we use the
provided schema distributed along with operators during scheduling for
both writing and reading.

Ad. 5 Yes, between TMs we use it as both reader and writer schema.
However in the source (usually Kafka), which is usually a permanent
storage you most probably want to store alongside records with different
schemas (because the schema may evolve over time) thus you need schema
registry.

Best,

Dawid

[1]
https://avro.apache.org/docs/1.10.2/spec.html#Data+Serialization+and+Deserialization

[2] https://avro.apache.org/docs/1.10.2/spec.html#Schema+Resolution

On 15/07/2021 15:48, M Singh wrote:
> Hello Dawid:
>
> Thanks for your answers and references.
>
> I do have a few questions:
>
> 1. Is there any scenario where the reader and writer schema should
> differ ? 
>
> 2. How is the mismatch b/w the two schemas (one passed as argument and
> other retrieved from schema registry) resolved at run time ?
>
> 3. As mentioned - "If you provide just the writer schema, the reader
> schema is assumed to be the same." - Is it possible in Flink to just
> use schema registry to retrieve the schema which can be used for both
> reading/writing  ?
>
> 4. Regarding "You can say that we read multiple different versions
> from e.g. Kafka and normalize it to that provided schema that's
> required across the pipeline." - Does this mean that if the reader
> (passed as argument) and writer (retrieved from the registry) schemas
> differ then Flink will normalize the differences ? If so, are there
> any guidelines as to how the fields are normalized ?
>
> 5. Regarding: "he forGeneric is the readerSchema and at the same time
> the schema that Flink will be working with in the pipeline. It will be
> the schema used to serialize and deserialize records between different
> TaskManagers" - Is the reader schema (passed as argument) used for
> reading and writing b/w Taskmanagers, the what role does schema from
> the registry play ?  Does it have to do with the "normalization"
> you've mentioned ?
>
> Thanks again for your time.
>
> Mans
>
> On Tuesday, July 13, 2021, 10:22:32 AM EDT, Dawid Wysakowicz
>  wrote:
>
>
> Hi,
>
> Yes, you are right the schema in the forGeneric is the readerSchema
> and at the same time the schema that Flink will be working with in the
> pipeline. It will be the schema used to serialize and deserialize
> records between different TaskManagers. Between the Flink TaskManagers
> that schema plays the role of both the reader and the writer schema.
>
> The way that Avro works is that you must provide both the writer and
> the reader schema. Otherwise it simply does not work. If you provide
> just the writer schema, the reader schema is assumed to be the same.
> Without the writer schema it is not possible to deserialize Avro. See
> extract from Avro spec: [1]
>
> Binary encoded Avro data does not include type information or
> field names. The benefit is that the serialized data is small, but
> as a result a schema must always be used in order to read Avro
> data correctly. The best way to ensure that the schema is
> structurally identical to the one used to write the data is to use
> the exact same schema.
>
> Therefore, files or systems that store Avro data should always
> include the writer's schema for that data. Avro-based remote
> procedure call (RPC) systems must also guarantee that remote
> recipients of data have a copy of the schema used to write that
> data. In general, it is advisable that any reader of Avro data
> should use a schema that is the same (as defined more fully in
> Parsing Canonical Form for Schemas
> 
> )
> as the schema that was used to write the data in order to
> deserialize it correctly. Deserializing data into a newer schema
> is accomplished by specifying an additional schema, the results of
> which are described in Schema Resolution
> .
>
> Now to the question why do we need to pass a schema as a parameter for
> the forGeneric method. For the DeserializationSchema, so when reading
> e.g. from Kafka it is indeed used as the reader schema. However as I
> said earlier it is also used when serializing between 

Re: Apache Flink - Reading Avro messages from Kafka with schema in schema registry

2021-07-15 Thread M Singh
 Hello Dawid:
Thanks for your answers and references.
I do have a few questions:
1. Is there any scenario where the reader and writer schema should differ ? 
2. How is the mismatch b/w the two schemas (one passed as argument and other 
retrieved from schema registry) resolved at run time ?
3. As mentioned - "If you provide just the writer schema, the reader schema is 
assumed to be the same." - Is it possible in Flink to just use schema registry 
to retrieve the schema which can be used for both reading/writing  ?
4. Regarding "You can say that we read multiple different versions from e.g. 
Kafka and normalize it to that provided schema that's required across the 
pipeline." - Does this mean that if the reader (passed as argument) and writer 
(retrieved from the registry) schemas differ then Flink will normalize the 
differences ? If so, are there any guidelines as to how the fields are 
normalized ?
5. Regarding: "he forGeneric is the readerSchema and at the same time the 
schema that Flink will be working with in the pipeline. It will be the schema 
used to serialize and deserialize records between different TaskManagers" - Is 
the reader schema (passed as argument) used for reading and writing b/w 
Taskmanagers, the what role does schema from the registry play ?  Does it have 
to do with the "normalization" you've mentioned ?
Thanks again for your time.
Mans
On Tuesday, July 13, 2021, 10:22:32 AM EDT, Dawid Wysakowicz 
 wrote:  
 
  
Hi,
 
Yes, you are right the schema in the forGeneric is the readerSchema and at the 
same time the schema that Flink will be working with in the pipeline. It will 
be the schema used to serialize and deserialize records between different 
TaskManagers. Between the Flink TaskManagers that schema plays the role of both 
the reader and the writer schema.
 
The way that Avro works is that you must provide both the writer and the reader 
schema. Otherwise it simply does not work. If you provide just the writer 
schema, the reader schema is assumed to be the same. Without the writer schema 
it is not possible to deserialize Avro. See extract from Avro spec: [1]
 
 
Binary encoded Avro data does not include type information or field names. The 
benefit is that the serialized data is small, but as a result a schema must 
always be used in order to read Avro data correctly. The best way to ensure 
that the schema is structurally identical to the one used to write the data is 
to use the exact same schema.
   
Therefore, files or systems that store Avro data should always include the 
writer's schema for that data. Avro-based remote procedure call (RPC) systems 
must also guarantee that remote recipients of data have a copy of the schema 
used to write that data. In general, it is advisable that any reader of Avro 
data should use a schema that is the same (as defined more fully in Parsing 
Canonical Form for Schemas) as the schema that was used to write the data in 
order to deserialize it correctly. Deserializing data into a newer schema is 
accomplished by specifying an additional schema, the results of which are 
described in Schema Resolution.
 
 
Now to the question why do we need to pass a schema as a parameter for the 
forGeneric method. For the DeserializationSchema, so when reading e.g. from 
Kafka it is indeed used as the reader schema. However as I said earlier it is 
also used when serializing between Flink's TaskManagers. In this scenario it is 
used both as the writer and as the reader schema. The design here is that we do 
not want to query the schema registry from every TaskManager. You can say that 
we read multiple different versions from e.g. Kafka and normalize it to that 
provided schema that's required across the pipeline.
 
The reason why you don't need to provide any schema in the 
KafkaAvroDeserializer is that it is ever used in a single parallel instance and 
it is not sent over a network again. So basically there you use the writer 
schema retrieved from schema registry as the reader schema.
 
I hope this answers your questions.
 
Best,
 
Dawid
 
 
[1] https://avro.apache.org/docs/1.10.2/spec.html
 
 On 09/07/2021 03:09, M Singh wrote:
  
 
 Hi: 
  I am trying to read avro encoded messages from Kafka with schema registered 
in schema registry. 
  I am using the class 
(https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html)
 using the method: 

| static ConfluentRegistryAvroDeserializationSchema
forGeneric(...) | 
  |

   
  The arguments for this method are: 
   forGeneric(org.apache.avro.Schema schema, String url) Creates 
ConfluentRegistryAvroDeserializationSchema that produces GenericRecord using 
the provided reader schema and looks up the writer schema in the Confluent 
Schema Registry. 
   As I understand, the schema is the reader schema and the url is the schema 
registry url used to retrieve writer schema.   
  I have a few 

Apache Flink - Reading Avro messages from Kafka with schema in schema registry

2021-07-08 Thread M Singh
Hi:
I am trying to read avro encoded messages from Kafka with schema registered in 
schema registry.
I am using the class 
(https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html)
 using the method:

| static ConfluentRegistryAvroDeserializationSchema
forGeneric(...) | 
 |


The arguments for this method are:
forGeneric(org.apache.avro.Schema schema, String url)Creates 
ConfluentRegistryAvroDeserializationSchema that produces GenericRecord using 
the provided reader schema and looks up the writer schema in the Confluent 
Schema Registry.
As I understand, the schema is the reader schema and the url is the schema 
registry url used to retrieve writer schema.  
I have a few questions:
1. Why do we need the writer schema from the registry ?  Is it to validate that 
the reader schema is same as writer schema ?2. Since the url of the schema 
registry is provided, why do we need to provide the reader schema ? Can the 
schema be retrieved at run time from the avro message metadata dynamically and 
then cache it (as shown in the example snippet from confluent below) ? 
The confluent consumer example 
(https://docs.confluent.io/5.0.0/schema-registry/docs/serializer-formatter.html)
 has the following example snippet where the schema.registry.url is provided to 
the consumer and the message can be converted to generic record using the 
KafkaAvroDeserializer without the need to pass the reader schema.

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
"io.confluent.kafka.serializers.KafkaAvroDeserializer");props.put("schema.registry.url",
 "http://localhost:8081;);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "topic1";final Consumer consumer = new 
KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic));
try {  while (true) {    ConsumerRecords records = 
consumer.poll(100);    for (ConsumerRecord record : records) {  
    System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), 
record.key(), record.value());    }  }} finally {  consumer.close();}


Please let me know if I have missed anything and there is a way to read avro 
encoded messages from kafka with schema registry without requiring reader 
schema.
Thanks