Hi Shamit,

I think the main problem is the wrong usage of the upsert kafka ddl. In the
ddl, you use avro as the format rather than avro-confluent.

The dev mail list is used to discuss implementation details. Please send
emails to user mail list for help.

[1] https://flink.apache.org/gettinghelp.html#user-mailing-list

Shamit <jainsha...@gmail.com> 于2021年2月7日周日 下午1:21写道:

> Hello Team,
>
> I am facing issue with "upsert-kafka" connector which should read the Avro
> message serialized using
> "io.confluent.kafka.serializers.KafkaAvroDeserializer". Same is working
> with
> "kafka" connector.
>
> Looks like we are not able to pass the schema registry url and subject name
> like the way we are passing while using "kafka" connector.
>
> Please help.
>
>
> Table definition with upsert-kafka is below (not working),
>
>                 CREATE TABLE proposalLine (PROPOSAL_LINE_ID
> bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED
> ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' =
> 'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'='
> http://localhost:8081',
>
> 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer',
>
> 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')
>
> ERROR:
>      Caused by: java.io.IOException: Failed to deserialize Avro record.
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
>         at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>         at
>
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>         at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>         at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
>         at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
>         at
>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>         at
>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>         at
>
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>         at
>
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>         at
>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>         at
>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>         at
>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>         at
>
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
>         ... 9 more
>
>
> Table definition with kafka connector is below (working),
> CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String
> )
> WITH ('connector' = 'kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline',
> 'format'='avro-confluent','avro-confluent.schema-registry.url' = '
> <http://localhost:8081'>  http://localhost:8081',
> 'avro-confluent.schema-registry.subject' =
> 'lndcdcadsprpslproposalline-value')
>
> Regards,
> Shamit <http://localhost:8081'>
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>

Reply via email to