Flink CDC Issue Import created FLINK-34824: ----------------------------------------------
Summary: Mysql-connector cdc sourcerecord not have data type
information,bug dez record have
Key: FLINK-34824
URL: https://issues.apache.org/jira/browse/FLINK-34824
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Reporter: Flink CDC Issue Import
### Search before asking
- [X] I searched in the
[issues|https://github.com/ververica/flink-cdc-connectors/issues] and found
nothing similar.
### Motivation
when we develop streaming data platform , we also need data types to satisfied
dynamic change table schema. But i fund Mysql-connector cdc record has too
many useless information. I suggested to use dbz record information type to
meet more needed.
### Solution
change sourcerecord code to dbz record . such as below
### Alternatives
EmbeddedEngineChangeEvent[key = {
"schema": {
"type": "struct",
"fields": [{
"type": "int64",
"optional": false,
"field": "id"
}],
"optional": false,
"name": "mysql_connector.gmall.activity_info.Key"
},
"payload": {
"id": 3
}
}, value = {
"schema": {
"type": "struct",
"fields": [{
"type": "struct",
"fields": [{
"type": "int64",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": true,
"field": "activity_name"
}, {
"type": "string",
"optional": true,
"field": "activity_type"
}, {
"type": "string",
"optional": true,
"field": "activity_desc"
}, {
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "start_time"
}, {
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "end_time"
}, {
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "create_time"
}],
"optional": true,
"name":
"mysql_connector.gmall.activity_info.Value",
"field": "before"
}, {
"type": "struct",
"fields": [{
"type": "int64",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": true,
"field": "activity_name"
}, {
"type": "string",
"optional": true,
"field": "activity_type"
}, {
"type": "string",
"optional": true,
"field": "activity_desc"
}, {
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "start_time"
}, {
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "end_time"
}, {
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "create_time"
}],
"optional": true,
"name":
"mysql_connector.gmall.activity_info.Value",
"field": "after"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "version"
}, {
"type": "string",
"optional": false,
"field": "connector"
}, {
"type": "string",
"optional": false,
"field": "name"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed":
"true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
}, {
"type": "string",
"optional": false,
"field": "db"
}, {
"type": "string",
"optional": true,
"field": "sequence"
}, {
"type": "string",
"optional": true,
"field": "table"
}, {
"type": "int64",
"optional": false,
"field": "server_id"
}, {
"type": "string",
"optional": true,
"field": "gtid"
}, {
"type": "string",
"optional": false,
"field": "file"
}, {
"type": "int64",
"optional": false,
"field": "pos"
}, {
"type": "int32",
"optional": false,
"field": "row"
}, {
"type": "int64",
"optional": true,
"field": "thread"
}, {
"type": "string",
"optional": true,
"field": "query"
}],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
}, {
"type": "string",
"optional": false,
"field": "op"
}, {
"type": "int64",
"optional": true,
"field": "ts_ms"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "id"
}, {
"type": "int64",
"optional": false,
"field": "total_order"
}, {
"type": "int64",
"optional": false,
"field": "data_collection_order"
}],
"optional": true,
"field": "transaction"
}],
"optional": false,
"name": "mysql_connector.gmall.activity_info.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 3,
"activity_name": "ccccc",
"activity_type": "1003",
"activity_desc": "fffff",
"start_time": null,
"end_time": null,
"create_time": null
},
"source": {
"version": "1.9.5.Final",
"connector": "mysql",
"name": "mysql-connector",
"ts_ms": 1694568910248,
"snapshot": "true",
"db": "gmall",
"sequence": null,
"table": "activity_info",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000015",
"pos": 154,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1694568910248,
"transaction": null
}
}, sourceRecord = SourceRecord {
sourcePartition = {
server = mysql - connector
}, sourceOffset = {
ts_sec = 1694568910,
file = mysql - bin .000015,
pos = 154,
snapshot = true
}
}
ConnectRecord {
topic = 'mysql-connector.gmall.activity_info', kafkaPartition =
null, key = Struct {
id = 3
}, keySchema = Schema {
mysql_connector.gmall.activity_info.Key: STRUCT
}, value = Struct {
after = Struct {
id = 3, activity_name = ccccc, activity_type =
1003, activity_desc = fffff
}, source = Struct {
version = 1.9 .5.Final, connector = mysql, name
= mysql - connector, ts_ms = 1694568910248, snapshot = true, db = gmall, table
= activity_info, server_id = 0, file = mysql - bin .000015, pos = 154, row = 0
}, op = r, ts_ms = 1694568910248
}, valueSchema = Schema {
mysql_connector.gmall.activity_info.Envelope: STRUCT
}, timestamp = null, headers = ConnectHeaders(headers = )
}]
### Anything else?
_No response_
### Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2614
Created by: [niuhu3|https://github.com/niuhu3]
Labels: enhancement,
Created at: Fri Nov 03 16:23:04 CST 2023
State: open
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
