Hao Yu created FLINK-38769:
------------------------------

             Summary: Flink Cdc Parse Mysql data is uncompleted
                 Key: FLINK-38769
                 URL: https://issues.apache.org/jira/browse/FLINK-38769
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.1.0
         Environment: Flink 1.17

Fink CDC 3.1
            Reporter: Hao Yu


Our flink cdc job catch an exception.

 
The exception is as follows:
{code:java}
//代码占位符
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while 
processing event at offset {transaction_id=null, ts_sec=xxx, 
file=xxx-bin.005958, pos=550478108, 
gtids=02a14707-1da1-11ef-9507-08c0eb82bce8:1-12266088830,3d496217-ca2c-11ea-9cde-e4434bd6a6e4:1-421074,3f0aa7dd-2ac3-11ef-99f0-043f72f048fe:1-184350,54e0c8f3-7a06-11ea-b241-fa163e0b065c:1-117143907,9a85d197-4452-11ec-86d9-b8cef6e1aa34:1-384810,b94d9c8d-bc20-11ea-8cce-0c42a1453028:1-564113948,cded5277-3135-11e9-b497-fa163e967ebc:1-46832,cf6f6734-1460-11eb-834a-0c42a1457a78:1-8142594382,ee0fafda-199a-11e9-89d3-fa163e9b5db4:1-92998908,
 row=1, server_id=20743, event=3}
at 
io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:248)
at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:842)
at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:947)
at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:833)
at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:1058)
at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:409)
... 7 more
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null 
used for required field: "datachange_createtime", schema type: STRING
at 
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
at org.apache.kafka.connect.data.Struct.validate(Struct.java:243)
at 
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:250)
at org.apache.kafka.connect.data.Struct.put(Struct.java:226)
at org.apache.kafka.connect.data.Struct.put(Struct.java:213)
at io.debezium.data.Envelope.create(Envelope.java:298)
at 
io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:82)
at 
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:54)
at 
io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:211)
... 12 more{code}
We added some logs in RelationalChangeRecordEmitter and found that the records 
are incomplete.

The table structure has 20 columns, but only 17 columns of records are present, 
with 3 columns missing.

Schema info 
{code:java}
//代码占位符
{
  "schema": {
    "key": {
      "name": "mysql_binlog_source.xxxx.Key",
      "type": "STRUCT",
      "optional": "false",
      "default": null,
      "fields": [
        {
          "name": "f0",
          "index": "0",
          "schema": { "type": "INT64", "optional": "false",            
"default": null }
        }
      ]
    },
    "value": {
      "name": "mysql_binlog_source.xxxx",
      "type": "STRUCT",
      "optional": "true",
      "default": null,
      "fields": [
        { "name": "f0",  "index": "0",  "schema": { "type": "INT64", 
"optional": "false", "default": null }},
        { "name": "f1",  "index": "1",  "schema": { "type": "INT64", 
"optional": "false", "default": "0" }},
        { "name": "f2",  "index": "2",  "schema": { "type": "INT32", 
"optional": "false", "default": "0" }},
        { "name": "f3",  "index": "3",  "schema": { "type": "INT64", 
"optional": "false", "default": "0" }},
        { "name": "f4",  "index": "4",  "schema": { "name": 
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false", 
"default": "0.00", "version": "1" }},
        { "name": "f5",  "index": "5",  "schema": { "name": 
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false", 
"default": "0.00", "version": "1" }},
        { "name": "f6",  "index": "6",  "schema": { "type": "STRING", 
"optional": "false", "default": "" }},
        { "name": "f7",  "index": "7",  "schema": { "name": 
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false", 
"default": "0.0000", "version": "1" }},
        { "name": "f8",  "index": "8",  "schema": { "type": "STRING", 
"optional": "false", "default": "" }},
        { "name": "f9",  "index": "9",  "schema": { "type": "STRING", 
"optional": "false", "default": "" }},
        { "name": "f10", "index": "10", "schema": { "type": "STRING", 
"optional": "false", "default": "" }},
        { "name": "f11", "index": "11", "schema": { "type": "STRING", 
"optional": "false", "default": "" }},
        { "name": "f12", "index": "12", "schema": { "type": "STRING", 
"optional": "false", "default": "" }},
        { "name": "f13", "index": "13", "schema": { "type": "STRING", 
"optional": "false", "default": "" }},
        { "name": "f14", "index": "14", "schema": { "name": 
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false", 
"default": "0.00", "version": "1" }},
        { "name": "f15", "index": "15", "schema": { "type": "INT64", 
"optional": "false", "default": "0" }},
        { "name": "f16", "index": "16", "schema": { "type": "INT16", 
"optional": "false", "default": "1" }},
        { "name": "f17", "index": "17", "schema": { "type": "STRING", 
"optional": "false", "default": null }},
        { "name": "f18", "index": "18", "schema": { "type": "STRING", 
"optional": "false", "default": null }},
        { "name": "f19", "index": "19", "schema": { "type": "STRING", 
"optional": "true",  "default": "" }}
      ]
    }
  }
}
 {code}
Data info 
{code:java}
//代码占位符
record: Struct{
f0=150000,
f1=280000,
f2=4900000,
f3=600,
f4=0.66,
f5=0.66,
f6=XXX,
f7=1.0000,
f8=XXX,
f9=XXX,
f10=,
f11=,
f12=,
f13=,
f14=0.00,
f15=3549000000,
f16=1
}
 {code}
I suspect that debezium did not parse completely during the parsing process.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to