Hi Agnelo,

How is the writer schema encoded if you are using no schema registry? Or
phrased differently: how does Flink know with which schema the data has
been written so that it can map it to the new schema?

On Wed, Apr 14, 2021 at 8:33 AM Agnelo Dcosta <agnelo.m.dco...@gmail.com>
wrote:

> Hi, we are using Flink SQL 1.12 and have a couple of tables created from
> kafka topics. Format is avro (not confluent avro) and no schema registry as
> such.
>
> In flink 1.11 we used to specify the schema, however in 1.12 the schema is
> derived from the message itself.
>
> Is it possible for the producers to start sending new fields without
> changes in the flink app?
>
>
>
> For example :
>
> {
>
>   "name": "topic1",
>
>   "type": "record",
>
>   "fields": [
>
>       {
>
>       "name": "field1",
>
>       "type": "string"
>
>     },
>
> {
>
>       "name": "field2",
>
>       "type": "string"
>
>     },
>
> {
>
>       *"name": "field3",*
>
> *      "type": "string"*
>
>     },
>
> ]
>
> }
>
>
>
> Flink table has:
>
> CREATE TABLE topic1(\n"
>
>                 + " field1 string not null \n"
>
>                 + " ,field2 string not null \n"
>
> "'connector' = 'kafka' \n"
>
>              + ",'topic' = 'topic1' \n"
>
>              + ",'scan.startup.mode' = 'latest-offset' \n"
>
>              + ",'properties.group.id' = 'topic1' \n"
>
>              + ",'properties.bootstrap.servers' = 'localhost:8082' \n"
>
>               + ",'properties.enable.auto.commit' = 'true' \n"
>
>              + ",'format' = 'avro' \n";
>
>
>
> With above settings I get a deserialization error:
>
>
>
> *java.io.IOException: Failed to deserialize Avro record.*
>
> *        at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>
> *        at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>
> *        at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> ~[flink-core-1.12.0.jar:1.12.0]*
>
> *        at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>
> *        at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>
> *        at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>
> *        at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>
> *        at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>
> *        at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>

Reply via email to