ZhengYu Chen created FLINK-35173:
------------------------------------

             Summary: Debezium Custom Time Serializer 
                 Key: FLINK-35173
                 URL: https://issues.apache.org/jira/browse/FLINK-35173
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
    Affects Versions: 3.1.0
            Reporter: ZhengYu Chen
             Fix For: 3.1.0


Currently, Flink CDC Time encounters time type errors (including DateTime, 
Time, Date, TimeStamp) when using MySQL Connector 
(JsonDebeziumDeserializationSchema) as deserialization, and the converted time 
is wrong. The essential reason is that the timestamp returned by the bottom 
layer of debezium is UTC (such as io.debezium.time.Timestamp). The community 
has already had some 
[PR|https://github.com/apache/flink-cdc/pull/1366/files#diff-e129e9fae3eea0bb32f0019debb4932413c91088d6dae656e2ecb63913badae4],
 but they are not work.

Now a way is provided to provide a solution based on Debezium's custom Convert 
interface 
(https://debezium.io/documentation/reference/1.9/development/converters.html),
Users can choose to convert the above four time types into STRING according to 
the specified time format to ensure that users can correctly convert JSON when 
using the Flink DataStream API.


When the user enables this converter, we need to configure it according to the 
parameters, That's some datastream use case:
{code:java}
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("converters", "datetime");
debeziumProperties.setProperty("datetime.database.type", 
DataBaseType.MYSQL.getType());
debeziumProperties.setProperty("datetime.type", 
"cn.xxx.sources.cdc.MysqlDebeziumConverter");
debeziumProperties.setProperty("datetime.format.date", "yyyy-MM-dd");
debeziumProperties.setProperty("datetime.format.time", "HH:mm:ss");
debeziumProperties.setProperty("datetime.format.datetime", "yyyy-MM-dd 
HH:mm:ss");
debeziumProperties.setProperty("datetime.format.timestamp", "yyyy-MM-dd 
HH:mm:ss");
debeziumProperties.setProperty("datetime.format.timestamp.zone", "UTC+8");
MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
        .hostname(url[0])
        .port(Integer.parseInt(url[1]))
        .databaseList(table.getDatabase())
        .tableList(getTablePattern(table))
        .username(table.getUserName())
        .password(table.getPassword())
        .debeziumProperties(debeziumProperties); {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to