One or more of the following files ( client code.zip ) violates IBM policy and 
all attachment(s) have been removed from the message.

**********************************************************************
Hi Team/Users,

We are trying to send plain AVRO messages to a Kafka topic but they are not 
getting deserialized properly at Flink side.
I am attaching the client code which sends the AVRO messages to kafka topic 
called orders.

As of now we are able to successfully send Avro messages using AVSC library to 
Kafka topic and it is available in the corresponding Kafka topic (verified 
using Kafka-console-consumer.sh)

However Flink is not able to deserialise the Avro messages properly.

The Flink SQL used to create the table is as follows:CREATE TABLE `source_1`

(
    `orderId`                      INT,
    `description`                  STRING,
    `price`                        FLOAT,
    `quantity`                     INT
)
WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka.01:9000',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'avro',
    'properties.security.protocol' = 'PLAINTEXT'
);

  *   We are constantly facing the below exception:
Caused by: java.io.IOException: Failed to deserialize consumer record 
ConsumerRecord(topic = orders, partition = 0, leaderEpoch = 0, offset = 40, 
CreateTime = 1689655077724, serialized key size = -1, serialized value size = 
15, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, 
value = [B@5eef8c84).
at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
... 14 more
Caused by: java.io.IOException: Failed to deserialize Avro record.
at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54)
... 15 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 
12345
at 
org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at 
org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at 
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)

  *   We found a probable solution in SO which says to send Avro schema as a 
part of every message which we tried but still facing the same error.
https://stackoverflow.com/questions/66065158/failed-to-deserialize-avro-record-apache-flink-sql-cli
  *   However this solution is contradictory to what is mentioned in the Flink 
docs which says that there is no need to send schema as a part of Avro messages 
and the docs also mention explicitly that it is not supported.
Link to the docs:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/avro/#format-options
Excerpt from the docs:
Currently, the Avro schema is always derived from table schema. Explicitly 
defining an Avro schema is not supported yet.

Thanks,
Girish

Reply via email to