UFOXD commented on issue #6955:
URL: https://github.com/apache/seatunnel/issues/6955#issuecomment-2151644714

   at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
           at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
           at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
           at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
           at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
           at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:807)
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:756)
           at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
           at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
           at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
           at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.IOException: Failed to deserialize consumer record 
ConsumerRecord(topic = test1, partition = 0, leaderEpoch = 0, offset = 126, 
CreateTime = 1717660426947, serialized key size = -1, serialized value size = 
135, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, 
value = [B@3168a118).
           at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
           ... 14 more
   Caused by: java.io.IOException: Corrupt Canal JSON message 
'{"data":{"runoob_id":74,"runoob_title":"学习 
PHP","runoob_author":"菜鸟教程4444","submission_date":"2024-06-06"},"type":"INSERT"}'.
           at 
org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.deserialize(CanalJsonDeserializationSchema.java:285)
           at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
           at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54)
           ... 15 more
   Caused by: 
org.apache.flink.formats.json.JsonToRowDataConverters$JsonParseException: Fail 
to deserialize at field: data.
           at 
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$createRowConverter$ef66fe9a$1(JsonToRowDataConverters.java:354)
           at 
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$wrapIntoNullableConverter$de0b9253$1(JsonToRowDataConverters.java:380)
           at 
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.convertToRowData(JsonRowDataDeserializationSchema.java:121)
           at 
org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.deserialize(CanalJsonDeserializationSchema.java:228)
           ... 17 more
   Caused by: java.lang.ClassCastException: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode 
cannot be cast to 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
           at 
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$createArrayConverter$94141d67$1(JsonToRowDataConverters.java:300)
           at 
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$wrapIntoNullableConverter$de0b9253$1(JsonToRowDataConverters.java:380)
           at 
org.apache.flink.formats.json.JsonToRowDataConverters.convertField(JsonToRowDataConverters.java:370)
           at 
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$createRowConverter$ef66fe9a$1(JsonToRowDataConverters.java:350)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to