ZhengYu Chen created FLINK-35173:
------------------------------------
Summary: Debezium Custom Time Serializer
Key: FLINK-35173
URL: https://issues.apache.org/jira/browse/FLINK-35173
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Affects Versions: 3.1.0
Reporter: ZhengYu Chen
Fix For: 3.1.0
Currently, Flink CDC Time encounters time type errors (including DateTime,
Time, Date, TimeStamp) when using MySQL Connector
(JsonDebeziumDeserializationSchema) as deserialization, and the converted time
is wrong. The essential reason is that the timestamp returned by the bottom
layer of debezium is UTC (such as io.debezium.time.Timestamp). The community
has already had some
[PR|https://github.com/apache/flink-cdc/pull/1366/files#diff-e129e9fae3eea0bb32f0019debb4932413c91088d6dae656e2ecb63913badae4],
but they are not work.
Now a way is provided to provide a solution based on Debezium's custom Convert
interface
(https://debezium.io/documentation/reference/1.9/development/converters.html),
Users can choose to convert the above four time types into STRING according to
the specified time format to ensure that users can correctly convert JSON when
using the Flink DataStream API.
When the user enables this converter, we need to configure it according to the
parameters, That's some datastream use case:
{code:java}
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("converters", "datetime");
debeziumProperties.setProperty("datetime.database.type",
DataBaseType.MYSQL.getType());
debeziumProperties.setProperty("datetime.type",
"cn.xxx.sources.cdc.MysqlDebeziumConverter");
debeziumProperties.setProperty("datetime.format.date", "yyyy-MM-dd");
debeziumProperties.setProperty("datetime.format.time", "HH:mm:ss");
debeziumProperties.setProperty("datetime.format.datetime", "yyyy-MM-dd
HH:mm:ss");
debeziumProperties.setProperty("datetime.format.timestamp", "yyyy-MM-dd
HH:mm:ss");
debeziumProperties.setProperty("datetime.format.timestamp.zone", "UTC+8");
MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
.hostname(url[0])
.port(Integer.parseInt(url[1]))
.databaseList(table.getDatabase())
.tableList(getTablePattern(table))
.username(table.getUserName())
.password(table.getPassword())
.debeziumProperties(debeziumProperties); {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)