Hi Shamit,

thanks for reaching out to the community. I am pulling in Timo who might
know more about this problem.

Cheers,
Till

On Sun, Feb 7, 2021 at 6:22 AM shamit jain <jainsha...@gmail.com> wrote:

> Hello Team,
>
> I am facing issue with "upsert-kafka" connector which should read the Avro
> message serialized using
> "io.confluent.kafka.serializers.KafkaAvroDeserializer". Same is working
> with "kafka" connector.
>
> Looks like we are not able to pass the schema registry url and subject
> name like the way we are passing while using "kafka" connector.
>
> Please help.
>
>
> Table definition with upsert-kafka is below (not working),
>
>                 CREATE TABLE proposalLine (PROPOSAL_LINE_ID
> bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED
> ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' =
> 'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'='<a
> href="http://localhost:8081'">http://localhost:8081',
> 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer',
> 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')
>
> ERROR:
>      Caused by: java.io.IOException: Failed to deserialize Avro record.
>         at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
>         at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
>         at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>         at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>         at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
>         at org.apache.avro.io
> .ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
>         at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>         at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>         at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>         at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>         at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
>         at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
>         ... 9 more
>
>
> Table definition with kafka connector is below (working),
> CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String
> ) WITH ('connector' = 'kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline',
> 'format'='avro-confluent','avro-confluent.schema-registry.url' = '<a href="
> http://localhost:8081'">http://localhost:8081',
> 'avro-confluent.schema-registry.subject' =
> 'lndcdcadsprpslproposalline-value')
>
> Regards,
> Shamit

Reply via email to