This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 6232d8405 [FLINK-35173][cdc][mysql] Debezium custom time serializer 
for MySQL connector (#3240)
6232d8405 is described below

commit 6232d84052422aa88299f28074a8437e91db2988
Author: ConradJam <jam.gz...@gmail.com>
AuthorDate: Fri Apr 26 14:10:34 2024 +0800

    [FLINK-35173][cdc][mysql] Debezium custom time serializer for MySQL 
connector (#3240)
---
 .../cdc/debezium/utils/ConvertTimeBceUtil.java     |  58 ++++
 .../converters/MysqlDebeziumTimeConverter.java     | 326 +++++++++++++++++++++
 .../MysqlDebeziumTimeConverterITCase.java          | 298 +++++++++++++++++++
 .../src/test/resources/ddl/date_convert_test.sql   |  36 +++
 4 files changed, 718 insertions(+)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java
new file mode 100644
index 000000000..b6ac0b226
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.debezium.utils;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.chrono.IsoEra;
+
+/** Convert And Check TimeBce Util. */
+public class ConvertTimeBceUtil {
+
+    private static final Date ONE_CE = Date.valueOf("0001-01-01");
+
+    public static String resolveEra(boolean isBce, String value) {
+        String mangledValue = value;
+        if (isBce) {
+            if (mangledValue.startsWith("-")) {
+                mangledValue = mangledValue.substring(1);
+            }
+            if (!mangledValue.endsWith(" BC")) {
+                mangledValue += " BC";
+            }
+        }
+        return mangledValue;
+    }
+
+    public static boolean isBce(LocalDate date) {
+        return date.getEra() == IsoEra.BCE;
+    }
+
+    public static String resolveEra(LocalDate date, String value) {
+        return resolveEra(isBce(date), value);
+    }
+
+    public static String resolveEra(Date date, String value) {
+        return resolveEra(date.before(ONE_CE), value);
+    }
+
+    public static String resolveEra(Timestamp timestamp, String value) {
+        return resolveEra(timestamp.before(ONE_CE), value);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
new file mode 100644
index 000000000..493fd682c
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.mysql.converters;
+
+import org.apache.flink.cdc.debezium.utils.ConvertTimeBceUtil;
+
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import io.debezium.time.Conversions;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Debezium converts the datetime type in MySQL into a UTC timestamp by 
default ({@link
+ * io.debezium.time.Timestamp} ),The time zone is hard-coded and cannot be 
changed. causing
+ * conversion errors part of the time Enable this converter to convert the 
four times "DATE",
+ * "DATETIME", "TIME", and "TIMESTAMP" into the format corresponding to the 
configured time zone
+ * (for example, yyyy-MM-dd)
+ *
+ * @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
+ */
+public class MysqlDebeziumTimeConverter
+        implements CustomConverter<SchemaBuilder, RelationalColumn> {
+
+    private static final Logger log = 
LoggerFactory.getLogger(MysqlDebeziumTimeConverter.class);
+
+    private static boolean loggedUnknownTimestampClass = false;
+    private static boolean loggedUnknownDateClass = false;
+    private static boolean loggedUnknownTimeClass = false;
+    private static boolean loggedUnknownTimestampWithTimeZoneClass = false;
+
+    private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", 
"TIMESTAMP"};
+
+    protected static final String DATE_FORMAT = "yyyy-MM-dd";
+    protected static final String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
+    protected static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
+    protected ZoneId zoneId;
+    protected static final String DEFAULT_DATE_FORMAT_PATTERN = "1970-01-01 
00:00:00";
+    protected DateTimeFormatter dateFormatter;
+    protected DateTimeFormatter timeFormatter;
+    protected DateTimeFormatter datetimeFormatter;
+    protected DateTimeFormatter timestampFormatter;
+    protected String schemaNamePrefix;
+    protected static DateTimeFormatter originalFormat =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    protected Boolean parseNullDefaultValue = true;
+
+    @Override
+    public void configure(Properties properties) {
+        String dateFormat = properties.getProperty("format.date", DATE_FORMAT);
+        String timeFormat = properties.getProperty("format.time", TIME_FORMAT);
+        String datetimeFormat = properties.getProperty("format.datetime", 
DATETIME_FORMAT);
+        String timestampFormat = properties.getProperty("format.timestamp", 
DATETIME_FORMAT);
+        this.parseNullDefaultValue =
+                Boolean.parseBoolean(
+                        properties.getProperty("format.default.value.convert", 
"true"));
+        String className = this.getClass().getName();
+        this.schemaNamePrefix = properties.getProperty("schema.name.prefix", 
className + ".mysql");
+        this.dateFormatter = DateTimeFormatter.ofPattern(dateFormat);
+        this.timeFormatter = DateTimeFormatter.ofPattern(timeFormat);
+        this.datetimeFormatter = DateTimeFormatter.ofPattern(datetimeFormat);
+        this.timestampFormatter = DateTimeFormatter.ofPattern(timestampFormat);
+        this.zoneId =
+                ZoneId.of(
+                        properties.getProperty(
+                                "format.timezone", 
ZoneId.systemDefault().toString()));
+    }
+
+    @Override
+    public void converterFor(
+            final RelationalColumn field, final 
ConverterRegistration<SchemaBuilder> registration) {
+        if (Arrays.stream(DATE_TYPES).anyMatch(s -> 
s.equalsIgnoreCase(field.typeName()))) {
+            registerDateConverter(field, registration);
+        }
+    }
+
+    private void registerDateConverter(
+            final RelationalColumn field, final 
ConverterRegistration<SchemaBuilder> registration) {
+        String columnType = field.typeName().toUpperCase();
+        String schemaName = this.schemaNamePrefix + "." + 
columnType.toLowerCase();
+        registration.register(
+                SchemaBuilder.string().name(schemaName).optional(),
+                value -> {
+                    log.debug(
+                            "find schema need to change dateType, field 
name:{} ,field type:{} ,field value:{} ,field "
+                                    + "default:{}",
+                            field.name(),
+                            columnType,
+                            value == null ? "null" : value,
+                            field.hasDefaultValue() ? field.defaultValue() : 
"null");
+                    if (value == null) {
+                        return convertDateDefaultValue(field);
+                    }
+                    switch (columnType.toUpperCase(Locale.ROOT)) {
+                        case "DATE":
+                            if (value instanceof Integer) {
+                                return this.convertToDate(
+                                        columnType, 
LocalDate.ofEpochDay((Integer) value));
+                            }
+                            return this.convertToDate(columnType, value);
+                        case "TIME":
+                            if (value instanceof Long) {
+                                long l =
+                                        Math.multiplyExact(
+                                                (Long) value, 
TimeUnit.MICROSECONDS.toNanos(1));
+                                return this.convertToTime(columnType, 
LocalTime.ofNanoOfDay(l));
+                            }
+                            return this.convertToTime(columnType, value);
+                        case "DATETIME":
+                            if (value instanceof Long) {
+                                if (getTimePrecision(field) <= 3) {
+                                    return this.convertToTimestamp(
+                                            columnType,
+                                            
Conversions.toInstantFromMillis((Long) value));
+                                }
+                                if (getTimePrecision(field) <= 6) {
+                                    return this.convertToTimestamp(
+                                            columnType,
+                                            
Conversions.toInstantFromMicros((Long) value));
+                                }
+                            }
+                            return this.convertToTimestamp(columnType, value);
+                        case "TIMESTAMP":
+                            return 
this.convertToTimestampWithTimezone(columnType, value);
+                        default:
+                            throw new IllegalArgumentException(
+                                    "Unknown field type  " + 
columnType.toUpperCase(Locale.ROOT));
+                    }
+                });
+    }
+
+    private Object convertToTimestampWithTimezone(String columnType, Object 
timestamp) {
+        // In snapshot mode, debezium produces a java.sql.Timestamp object for 
the TIMESTAMPTZ type.
+        // Conceptually, a timestamp with timezone is an Instant. But 
t.toInstant() actually
+        // mangles the value for ancient dates, because leap years weren't 
applied consistently in
+        // ye olden days. Additionally, toInstant() (and toLocalDateTime()) 
actually lose the era
+        // indicator,
+        // so we can't rely on their getEra() methods.
+        // So we have special handling for this case, which sidesteps the 
toInstant conversion.
+        if (timestamp instanceof Timestamp) {
+            Timestamp value = (Timestamp) timestamp;
+            ZonedDateTime zonedDateTime = value.toInstant().atZone(zoneId);
+            return ConvertTimeBceUtil.resolveEra(value, 
zonedDateTime.format(timestampFormatter));
+        } else if (timestamp instanceof OffsetDateTime) {
+            OffsetDateTime value = (OffsetDateTime) timestamp;
+            return ConvertTimeBceUtil.resolveEra(
+                    value.toLocalDate(), value.format(timestampFormatter));
+        } else if (timestamp instanceof ZonedDateTime) {
+            ZonedDateTime zonedDateTime = (ZonedDateTime) timestamp;
+            return ConvertTimeBceUtil.resolveEra(
+                    zonedDateTime.toLocalDate(), 
zonedDateTime.format(timestampFormatter));
+        } else if (timestamp instanceof Instant) {
+            OffsetDateTime dateTime = OffsetDateTime.ofInstant((Instant) 
timestamp, zoneId);
+            ZonedDateTime timestampZt = ZonedDateTime.from(dateTime);
+            LocalDate localDate = timestampZt.toLocalDate();
+            return ConvertTimeBceUtil.resolveEra(localDate, 
timestampZt.format(timestampFormatter));
+        } else {
+            if (!loggedUnknownTimestampWithTimeZoneClass) {
+                printUnknownDateClassLogs(columnType, timestamp);
+                loggedUnknownTimestampWithTimeZoneClass = true;
+            }
+            // If init 1970-01-01T00:00:00Zd need to change
+            Instant instant = Instant.parse(timestamp.toString());
+            OffsetDateTime dateTime = OffsetDateTime.ofInstant(instant, 
zoneId);
+            ZonedDateTime timestampZt = ZonedDateTime.from(dateTime);
+            LocalDate localDate = timestampZt.toLocalDate();
+            return ConvertTimeBceUtil.resolveEra(localDate, 
timestampZt.format(timestampFormatter));
+        }
+    }
+
+    private Object convertToTimestamp(String columnType, Object timestamp) {
+        if (timestamp instanceof Timestamp) {
+            // Snapshot mode
+            LocalDateTime localDateTime = ((Timestamp) 
timestamp).toLocalDateTime();
+            return ConvertTimeBceUtil.resolveEra(
+                    ((Timestamp) timestamp), 
localDateTime.format(datetimeFormatter));
+        } else if (timestamp instanceof Instant) {
+            // Incremental mode
+            Instant time = (Instant) timestamp;
+            ZonedDateTime zonedDateTime = time.atZone(zoneId);
+            return ConvertTimeBceUtil.resolveEra(
+                    zonedDateTime.toLocalDate(),
+                    time.atOffset(zonedDateTime.getOffset())
+                            .toLocalDateTime()
+                            .format(datetimeFormatter));
+        } else if (timestamp instanceof LocalDateTime) {
+            LocalDateTime dateTime = (LocalDateTime) timestamp;
+            LocalDate localDateTime = dateTime.toLocalDate();
+            return ConvertTimeBceUtil.resolveEra(localDateTime, 
dateTime.format(datetimeFormatter));
+        }
+        if (!loggedUnknownTimestampClass) {
+            printUnknownDateClassLogs(columnType, timestamp);
+            loggedUnknownTimestampClass = true;
+        }
+        LocalDateTime localDateTime = 
LocalDateTime.parse(timestamp.toString());
+        LocalDate localDate = localDateTime.toLocalDate();
+        return ConvertTimeBceUtil.resolveEra(localDate, 
localDateTime.format(datetimeFormatter));
+    }
+
+    private Object convertToTime(String columnType, Object time) {
+        if (time instanceof Time) {
+            return formatTime(((Time) time).toLocalTime());
+        } else if (time instanceof LocalTime) {
+            return formatTime((LocalTime) time);
+        } else if (time instanceof java.time.Duration) {
+            long value = ((java.time.Duration) time).toNanos();
+            if (value >= 0 && value < TimeUnit.DAYS.toNanos(1)) {
+                return formatTime(LocalTime.ofNanoOfDay(value));
+            } else {
+                long updatedValue = Math.min(Math.abs(value), 
LocalTime.MAX.toNanoOfDay());
+                log.debug(
+                        "Time values must use number of nanoseconds greater 
than 0 and less than 86400000000000 but its {}, "
+                                + "converting to {} ",
+                        value,
+                        updatedValue);
+                return formatTime(LocalTime.ofNanoOfDay(updatedValue));
+            }
+        } else {
+            if (!loggedUnknownTimeClass) {
+                printUnknownDateClassLogs(columnType, time);
+                loggedUnknownTimeClass = true;
+            }
+
+            String valueAsString = time.toString();
+            if (valueAsString.startsWith("24")) {
+                log.debug("Time value {} is above range, converting to 
23:59:59", valueAsString);
+                return LocalTime.MAX.toString();
+            }
+            return formatTime(LocalTime.parse(valueAsString));
+        }
+    }
+
+    private String formatTime(LocalTime localTime) {
+        return localTime.format(timeFormatter);
+    }
+
+    private int getTimePrecision(final RelationalColumn field) {
+        return field.length().orElse(-1);
+    }
+
+    private String convertToDate(String columnType, Object date) {
+        if (date instanceof Date) {
+            // Snapshot mode
+            LocalDate localDate = ((Date) date).toLocalDate();
+            return ConvertTimeBceUtil.resolveEra(localDate, 
localDate.format(dateFormatter));
+        } else if (date instanceof LocalDate) {
+            // Incremental mode
+            return dateFormatter.format((LocalDate) date);
+        } else if (date instanceof LocalDateTime) {
+            return datetimeFormatter.format((LocalDateTime) date);
+        } else if (date instanceof Integer) {
+            return LocalDate.ofEpochDay(((Integer) 
date).longValue()).format(dateFormatter);
+        } else {
+            if (!loggedUnknownDateClass) {
+                printUnknownDateClassLogs(columnType, date);
+                loggedUnknownDateClass = true;
+            }
+            LocalDate localDate = LocalDate.parse(date.toString());
+            return ConvertTimeBceUtil.resolveEra(localDate, 
localDate.format(dateFormatter));
+        }
+    }
+
+    public Object convertDateDefaultValue(RelationalColumn field) {
+        if (field.isOptional()) {
+            return null;
+        } else if (field.hasDefaultValue()) {
+            // There is an extreme case where the field defaultValue is 0, 
resulting in a Kafka
+            // Schema mismatch
+            if (parseNullDefaultValue) {
+                LocalDateTime dateTime =
+                        LocalDateTime.parse(DEFAULT_DATE_FORMAT_PATTERN, 
originalFormat);
+                String columnType = field.typeName().toUpperCase();
+                switch (columnType.toUpperCase(Locale.ROOT)) {
+                    case "DATE":
+                        return dateTime.format(dateFormatter);
+                    case "DATETIME":
+                        return dateTime.format(datetimeFormatter);
+                    case "TIME":
+                        return dateTime.format(timeFormatter);
+                    case "TIMESTAMP":
+                        return dateTime.format(timestampFormatter);
+                }
+            }
+        }
+        return null;
+    }
+
+    private static void printUnknownDateClassLogs(String type, Object value) {
+        log.warn(
+                "MySql Date Convert Database type : {} Unknown class for Date 
data type {}",
+                type,
+                value.getClass());
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java
new file mode 100644
index 000000000..d14f31f97
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.debezium.converters;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.mysql.MySqlValidatorTest;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.debezium.connector.mysql.converters.MysqlDebeziumTimeConverter;
+import io.debezium.data.Envelope;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static java.lang.String.format;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInAnyOrder;
+
+/** Test for {@link MysqlDebeziumTimeConverter}. */
+public class MysqlDebeziumTimeConverterITCase {
+
+    private static TemporaryFolder tempFolder;
+    private static File resourceFolder;
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MysqlDebeziumTimeConverterITCase.class);
+
+    @Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
+
+    @Before
+    public void setup() throws Exception {
+        resourceFolder =
+                Paths.get(
+                                Objects.requireNonNull(
+                                                MySqlValidatorTest.class
+                                                        .getClassLoader()
+                                                        .getResource("."))
+                                        .toURI())
+                        .toFile();
+        tempFolder = new TemporaryFolder(resourceFolder);
+        tempFolder.create();
+        env.setParallelism(1);
+    }
+
+    @Test
+    public void testReadDateConvertDataStreamInJvmTime() throws Exception {
+        testReadDateConvertDataStreamSource(ZoneId.systemDefault().toString());
+    }
+
+    @Test
+    public void testReadDateConvertDataStreamInAsia() throws Exception {
+        testReadDateConvertDataStreamSource("Asia/Shanghai");
+    }
+
+    @Test
+    public void testReadDateConvertDataStreamInBerlin() throws Exception {
+        testReadDateConvertDataStreamSource("Europe/Berlin");
+    }
+
+    @Test
+    public void testReadDateConvertSQLSourceInAsia() throws Exception {
+        testTemporalTypesWithMySqlServerTimezone("Asia/Shanghai");
+    }
+
+    @Test
+    public void testReadDateConvertSQLSourceInBerlin() throws Exception {
+        testTemporalTypesWithMySqlServerTimezone("Europe/Berlin");
+    }
+
+    private void testReadDateConvertDataStreamSource(String timezone) throws 
Exception {
+        MySqlContainer mySqlContainer = createMySqlContainer(timezone);
+        startContainers(mySqlContainer, timezone);
+        UniqueDatabase db = getUniqueDatabase(mySqlContainer);
+        db.createAndInitialize();
+        env.enableCheckpointing(1000L);
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        MySqlSourceBuilder<String> builder =
+                MySqlSource.<String>builder()
+                        .hostname(db.getHost())
+                        .port(db.getDatabasePort())
+                        .databaseList(db.getDatabaseName())
+                        .tableList(db.getDatabaseName() + ".date_convert_test")
+                        .startupOptions(StartupOptions.initial())
+                        .serverTimeZone(timezone)
+                        .username(db.getUsername())
+                        .password(db.getPassword())
+                        
.debeziumProperties(getDebeziumConfigurations(timezone));
+        builder.deserializer(new JsonDebeziumDeserializationSchema());
+        DataStreamSource<String> convertDataStreamSource =
+                env.fromSource(
+                        builder.build(),
+                        WatermarkStrategy.noWatermarks(),
+                        "testDataStreamSourceConvertData");
+        List<String> result = convertDataStreamSource.executeAndCollect(3);
+        validTimestampValue(result);
+    }
+
+    private void validTimestampValue(List<String> result) throws 
JsonProcessingException {
+        ObjectMapper mapper = new ObjectMapper();
+        String[] timestampValues = new String[] {"14:23:00", "00:00:00", 
"00:00:00"};
+        for (String after : result) {
+            JsonNode jsonNode = mapper.readTree(after);
+            Assert.assertEquals(
+                    
timestampValues[jsonNode.get(Envelope.FieldName.AFTER).get("id").asInt() - 1],
+                    jsonNode.get("after").get("test_timestamp").asText());
+        }
+    }
+
+    private void testTemporalTypesWithMySqlServerTimezone(String timezone) 
throws Exception {
+        MySqlContainer mySqlContainer = createMySqlContainer(timezone);
+        startContainers(mySqlContainer, timezone);
+        UniqueDatabase db = getUniqueDatabase(mySqlContainer);
+        db.createAndInitialize();
+        env.enableCheckpointing(1000L);
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        String sourceDDL =
+                format(
+                        "CREATE TABLE customers ("
+                                + " id BIGINT NOT NULL,"
+                                + " test_timestamp STRING,"
+                                + " test_datetime STRING,"
+                                + " test_date STRING,"
+                                + " test_time STRING, "
+                                + "primary key (id) not enforced"
+                                + ") WITH ("
+                                + " 'connector' = 'mysql-cdc',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'true',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'table-name' = 'date_convert_test',"
+                                + " 'scan.startup.mode' = '%s',"
+                                + " 'server-time-zone' = '%s',"
+                                + " 'debezium.converters' = 'datetime',"
+                                + " 'debezium.datetime.type' = '%s',"
+                                + " 'debezium.database.connectionTimeZone' = 
'%s',"
+                                + " 'debezium.datetime.format.time' = 
'HH:mm:ss',"
+                                + " 'debezium.datetime.format.timezone' = 
'%s',"
+                                + " 'debezium.datetime.format.timestamp' = 
'HH:mm:ss',"
+                                + " 
'debezium.datetime.format.default.value.convert' = 'true'"
+                                + ")",
+                        mySqlContainer.getHost(),
+                        mySqlContainer.getDatabasePort(),
+                        db.getUsername(),
+                        db.getPassword(),
+                        db.getDatabaseName(),
+                        "initial",
+                        timezone,
+                        MysqlDebeziumTimeConverter.class.getName(),
+                        timezone,
+                        timezone);
+        tEnv.executeSql(sourceDDL);
+        TableResult tableResult = tEnv.executeSql("select * from customers");
+        checkData(tableResult);
+    }
+
+    private Properties getDebeziumConfigurations(String timezone) {
+        Properties debeziumProperties = new Properties();
+        // set properties
+        debeziumProperties.setProperty("converters", "datetime");
+        debeziumProperties.setProperty("datetime.type", 
MysqlDebeziumTimeConverter.class.getName());
+        debeziumProperties.setProperty("datetime.format.timestamp", 
"HH:mm:ss");
+        
debeziumProperties.setProperty("datetime.format.default.value.convert", 
"false");
+        // If not set time convert maybe error
+        debeziumProperties.setProperty("database.connectionTimeZone", 
timezone);
+        debeziumProperties.setProperty("datetime.format.timezone", timezone);
+        LOG.info("Supplied debezium properties: {}", debeziumProperties);
+        return debeziumProperties;
+    }
+
+    private void checkData(TableResult tableResult) {
+        String[] snapshotForSingleTable =
+                new String[] {
+                    "+I[1, 14:23:00, 2023-04-01 14:24:00, 2023-04-01, 
14:25:00]",
+                    "+I[3, 00:00:00, null, null, 00:01:20]",
+                    "+I[2, 00:00:00, null, null, 00:00:00]"
+                };
+
+        List<String> expectedSnapshotData = new 
ArrayList<>(Arrays.asList(snapshotForSingleTable));
+        CloseableIterator<Row> collect = tableResult.collect();
+        tableResult.getJobClient().get().getJobID();
+        assertEqualsInAnyOrder(
+                expectedSnapshotData, fetchRows(collect, 
expectedSnapshotData.size()));
+    }
+
+    private static List<String> fetchRows(Iterator<Row> iter, int size) {
+        List<String> rows = new ArrayList<>(size);
+        while (size > 0 && iter.hasNext()) {
+            Row row = iter.next();
+            rows.add(row.toString());
+            size--;
+        }
+        return rows;
+    }
+
+    protected MySqlContainer createMySqlContainer(String timezone) {
+        return (MySqlContainer)
+                new MySqlContainer(MySqlVersion.V5_7)
+                        
.withConfigurationOverride(buildMySqlConfigWithTimezone(timezone))
+                        .withSetupSQL("docker/setup.sql")
+                        .withDatabaseName("flink-test")
+                        .withUsername("flinkuser")
+                        .withPassword("flinkpw")
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+    }
+
+    protected void startContainers(MySqlContainer mySqlContainer, String 
timezone) {
+        LOG.info("Starting containers with timezone {} ...", timezone);
+        Startables.deepStart(Stream.of(mySqlContainer)).join();
+        LOG.info("Containers are started.");
+        LOG.info("JVM System Clock Zone Id : {}", ZoneId.systemDefault());
+    }
+
+    protected UniqueDatabase getUniqueDatabase(MySqlContainer mySqlContainer) {
+        return new UniqueDatabase(mySqlContainer, "date_convert_test", 
"mysqluser", "mysqlpw");
+    }
+
+    private String buildMySqlConfigWithTimezone(String timezone) {
+        try {
+            File folder = 
tempFolder.newFolder(String.valueOf(UUID.randomUUID()));
+            Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf"));
+            String mysqldConf =
+                    "[mysqld]\n"
+                            + "binlog_format = row\n"
+                            + "log_bin = mysql-bin\n"
+                            + "server-id = 223344\n"
+                            + "binlog_row_image = FULL\n";
+            String timezoneConf = "default-time_zone = '" + timezone + "'\n";
+            Files.write(
+                    cnf,
+                    Collections.singleton(mysqldConf + timezoneConf),
+                    StandardCharsets.UTF_8,
+                    StandardOpenOption.APPEND);
+            return 
Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to create my.cnf file.", e);
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql
new file mode 100644
index 000000000..262c1ceb1
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql
@@ -0,0 +1,36 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the `License`); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+-- 
+--      http://www.apache.org/licenses/LICENSE-2.0
+-- 
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an `AS IS` BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- 
----------------------------------------------------------------------------------------------------------------
+-- DATABASE:  column_type_test
+-- 
----------------------------------------------------------------------------------------------------------------
+
+-- datetime NOT NULL
+
+CREATE TABLE date_convert_test
+(
+    `id`                bigint NOT NULL AUTO_INCREMENT,
+    `test_timestamp`    timestamp NULL,
+    `test_datetime`     datetime NULL,
+    `test_date`         date DEFAULT NULL,
+    `test_time`         time DEFAULT NULL,
+    PRIMARY KEY (`id`)
+) DEFAULT CHARSET=utf8;
+
+INSERT INTO date_convert_test (id,test_timestamp, test_datetime, test_date, 
test_time)
+VALUES
+(1,'2023-04-01 14:23:00', '2023-04-01 14:24:00', '2023-04-01', '14:25:00'),
+(2,'2024-04-23 00:00:00', DEFAULT, NULL ,'00:00:00'),
+(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120);
\ No newline at end of file


Reply via email to