Hao Yu created FLINK-38769:
------------------------------
Summary: Flink Cdc Parse Mysql data is uncompleted
Key: FLINK-38769
URL: https://issues.apache.org/jira/browse/FLINK-38769
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.1.0
Environment: Flink 1.17
Fink CDC 3.1
Reporter: Hao Yu
Our flink cdc job catch an exception.
The exception is as follows:
{code:java}
//代码占位符
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while
processing event at offset {transaction_id=null, ts_sec=xxx,
file=xxx-bin.005958, pos=550478108,
gtids=02a14707-1da1-11ef-9507-08c0eb82bce8:1-12266088830,3d496217-ca2c-11ea-9cde-e4434bd6a6e4:1-421074,3f0aa7dd-2ac3-11ef-99f0-043f72f048fe:1-184350,54e0c8f3-7a06-11ea-b241-fa163e0b065c:1-117143907,9a85d197-4452-11ec-86d9-b8cef6e1aa34:1-384810,b94d9c8d-bc20-11ea-8cce-0c42a1453028:1-564113948,cded5277-3135-11e9-b497-fa163e967ebc:1-46832,cf6f6734-1460-11eb-834a-0c42a1457a78:1-8142594382,ee0fafda-199a-11e9-89d3-fa163e9b5db4:1-92998908,
row=1, server_id=20743, event=3}
at
io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:248)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:842)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:947)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:833)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:1058)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:409)
... 7 more
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null
used for required field: "datachange_createtime", schema type: STRING
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
at org.apache.kafka.connect.data.Struct.validate(Struct.java:243)
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:250)
at org.apache.kafka.connect.data.Struct.put(Struct.java:226)
at org.apache.kafka.connect.data.Struct.put(Struct.java:213)
at io.debezium.data.Envelope.create(Envelope.java:298)
at
io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:82)
at
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:54)
at
io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:211)
... 12 more{code}
We added some logs in RelationalChangeRecordEmitter and found that the records
are incomplete.
The table structure has 20 columns, but only 17 columns of records are present,
with 3 columns missing.
Schema info
{code:java}
//代码占位符
{
"schema": {
"key": {
"name": "mysql_binlog_source.xxxx.Key",
"type": "STRUCT",
"optional": "false",
"default": null,
"fields": [
{
"name": "f0",
"index": "0",
"schema": { "type": "INT64", "optional": "false",
"default": null }
}
]
},
"value": {
"name": "mysql_binlog_source.xxxx",
"type": "STRUCT",
"optional": "true",
"default": null,
"fields": [
{ "name": "f0", "index": "0", "schema": { "type": "INT64",
"optional": "false", "default": null }},
{ "name": "f1", "index": "1", "schema": { "type": "INT64",
"optional": "false", "default": "0" }},
{ "name": "f2", "index": "2", "schema": { "type": "INT32",
"optional": "false", "default": "0" }},
{ "name": "f3", "index": "3", "schema": { "type": "INT64",
"optional": "false", "default": "0" }},
{ "name": "f4", "index": "4", "schema": { "name":
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false",
"default": "0.00", "version": "1" }},
{ "name": "f5", "index": "5", "schema": { "name":
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false",
"default": "0.00", "version": "1" }},
{ "name": "f6", "index": "6", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f7", "index": "7", "schema": { "name":
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false",
"default": "0.0000", "version": "1" }},
{ "name": "f8", "index": "8", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f9", "index": "9", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f10", "index": "10", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f11", "index": "11", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f12", "index": "12", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f13", "index": "13", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f14", "index": "14", "schema": { "name":
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false",
"default": "0.00", "version": "1" }},
{ "name": "f15", "index": "15", "schema": { "type": "INT64",
"optional": "false", "default": "0" }},
{ "name": "f16", "index": "16", "schema": { "type": "INT16",
"optional": "false", "default": "1" }},
{ "name": "f17", "index": "17", "schema": { "type": "STRING",
"optional": "false", "default": null }},
{ "name": "f18", "index": "18", "schema": { "type": "STRING",
"optional": "false", "default": null }},
{ "name": "f19", "index": "19", "schema": { "type": "STRING",
"optional": "true", "default": "" }}
]
}
}
}
{code}
Data info
{code:java}
//代码占位符
record: Struct{
f0=150000,
f1=280000,
f2=4900000,
f3=600,
f4=0.66,
f5=0.66,
f6=XXX,
f7=1.0000,
f8=XXX,
f9=XXX,
f10=,
f11=,
f12=,
f13=,
f14=0.00,
f15=3549000000,
f16=1
}
{code}
I suspect that debezium did not parse completely during the parsing process.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)