This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 4baee8c63 [FLINK-37000][pipeline-connector][mysql] Fix MySQL CDC could
not handle datetime prior to 1970 properly (#3834)
4baee8c63 is described below
commit 4baee8c6369d3cfb58ff06ae3eb2042a61e8809b
Author: yuxiqian <[email protected]>
AuthorDate: Tue Apr 8 16:19:58 2025 +0800
[FLINK-37000][pipeline-connector][mysql] Fix MySQL CDC could not handle
datetime prior to 1970 properly (#3834)
---
.../source/MySqlAncientDateAndTimeITCase.java | 412 ++++++++++++++++
.../test/resources/ddl/ancient_date_and_time.sql | 124 +++++
.../docker/server-allow-ancient-date-time/my.cnf | 58 +++
.../event/DebeziumEventDeserializationSchema.java | 6 +-
.../table/RowDataDebeziumDeserializeSchema.java | 9 +-
.../source/MySqlAncientDateAndTimeITCase.java | 521 +++++++++++++++++++++
.../mysql/table/MySqlAncientDateAndTimeITCase.java | 363 ++++++++++++++
.../test/resources/ddl/ancient_date_and_time.sql | 124 +++++
.../docker/server-allow-ancient-date-time/my.cnf | 58 +++
9 files changed, 1671 insertions(+), 4 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java
new file mode 100644
index 000000000..b630b5c8b
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CloseableIterator;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+/** Integration tests for MySQL Table source to handle ancient date and time
records. */
+public class MySqlAncientDateAndTimeITCase extends MySqlSourceTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlAncientDateAndTimeITCase.class);
+
+ private static final String TEST_USER = "mysqluser";
+ private static final String TEST_PASSWORD = "mysqlpw";
+
+ // We need an extra "no_zero_in_date = false" config to insert malformed
date and time records.
+ private static final MySqlContainer MYSQL_CONTAINER =
+ createMySqlContainer(MySqlVersion.V8_0,
"docker/server-allow-ancient-date-time/my.cnf");
+
+ private final UniqueDatabase ancientDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, "ancient_date_and_time",
TEST_USER, TEST_PASSWORD);
+
+ private final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ @BeforeClass
+ public static void beforeClass() {
+ LOG.info("Starting MySql container...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Container MySql is started.");
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ LOG.info("Stopping MySql containers...");
+ MYSQL_CONTAINER.stop();
+ LOG.info("Container MySql is stopped.");
+ }
+
+ @Before
+ public void before() {
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(200);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ ancientDatabase.createAndInitialize();
+ }
+
+ @After
+ public void after() {
+ ancientDatabase.dropDatabase();
+ }
+
+ /**
+ * With the TimeAdjuster in Debezium, all date / time records between year
0001 and 0099 will be
+ * shifted to 1971 ~ 2069.
+ */
+ @Test
+ public void testAncientDateAndTimeWithTimeAdjuster() throws Exception {
+ // LocalDate.ofEpochDay reference:
+ //
+---------------------------------------------------------------------------------+
+ // | 17390 | 11323 | 11720 | 23072 | -557266 | -1
| 18261 |
+ // | 2017/8/12 | 2001/1/1 | 2002/2/2 | 2033/3/3 | 0444/4/4 |
1969/12/31 | 2019/12/31 |
+ //
+---------------------------------------------------------------------------------+
+ runGenericAncientDateAndTimeTest(
+ ancientDatabase,
+ true,
+ Arrays.asList(
+ "[1, 17390, 2016-07-13T17:17:17,
2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123,
2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450,
2010-01-19T17:17:17.123456]",
+ "[2, null, null, null, null, null, null, null, null]",
+ "[3, 11323, 2001-01-01T16:16:16,
2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123,
2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450,
2001-01-01T16:16:16.123456]",
+ "[4, 11720, 2002-02-02T15:15:15,
2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123,
2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450,
2002-02-02T15:15:15.123456]",
+ "[5, 23072, 2033-03-03T14:14:14,
2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123,
2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450,
2033-03-03T14:14:14.123456]",
+ "[6, -557266, 0444-04-04T13:13:13,
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123,
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450,
0444-04-04T13:13:13.123456]",
+ "[7, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100,
1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400,
1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]",
+ "[8, 18261, 2019-12-31T23:11:11,
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123,
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450,
2019-12-31T23:11:11.123456]"),
+ Arrays.asList(
+ "[9, 17390, 2016-07-13T17:17:17,
2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123,
2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450,
2010-01-19T17:17:17.123456]",
+ "[10, null, null, null, null, null, null, null, null]",
+ "[11, 11323, 2001-01-01T16:16:16,
2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123,
2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450,
2001-01-01T16:16:16.123456]",
+ "[12, 11720, 2002-02-02T15:15:15,
2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123,
2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450,
2002-02-02T15:15:15.123456]",
+ "[13, 23072, 2033-03-03T14:14:14,
2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123,
2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450,
2033-03-03T14:14:14.123456]",
+ "[14, -557266, 0444-04-04T13:13:13,
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123,
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450,
0444-04-04T13:13:13.123456]",
+ "[15, -1, 1969-12-31T12:12:12,
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123,
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450,
1969-12-31T12:12:12.123456]",
+ "[16, 18261, 2019-12-31T23:11:11,
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123,
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450,
2019-12-31T23:11:11.123456]"));
+ }
+
+ @Test
+ public void testAncientDateAndTimeWithoutTimeAdjuster() throws Exception {
+ // LocalDate.ofEpochDay reference:
+ //
+---------------------------------------------------------------------------------+
+ // | -713095 | -719162 | -718765 | -707413 | -557266 | -1
| 18261 |
+ // | 0017/8/12 | 0001/1/1 | 0002/2/2 | 0033/3/3 | 0444/4/4 |
1969/12/31 | 2019/12/31 |
+ //
+---------------------------------------------------------------------------------+
+ runGenericAncientDateAndTimeTest(
+ ancientDatabase,
+ false,
+ Arrays.asList(
+ "[1, -713095, 0016-07-13T17:17:17,
0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123,
0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450,
0010-01-19T17:17:17.123456]",
+ "[2, null, null, null, null, null, null, null, null]",
+ "[3, -719162, 0001-01-01T16:16:16,
0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123,
0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450,
0001-01-01T16:16:16.123456]",
+ "[4, -718765, 0002-02-02T15:15:15,
0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123,
0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450,
0002-02-02T15:15:15.123456]",
+ "[5, -707413, 0033-03-03T14:14:14,
0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123,
0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450,
0033-03-03T14:14:14.123456]",
+ "[6, -557266, 0444-04-04T13:13:13,
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123,
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450,
0444-04-04T13:13:13.123456]",
+ "[7, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100,
1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400,
1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]",
+ "[8, 18261, 2019-12-31T23:11:11,
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123,
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450,
2019-12-31T23:11:11.123456]"),
+ Arrays.asList(
+ "[9, -713095, 0016-07-13T17:17:17,
0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123,
0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450,
0010-01-19T17:17:17.123456]",
+ "[10, null, null, null, null, null, null, null, null]",
+ "[11, -719162, 0001-01-01T16:16:16,
0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123,
0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450,
0001-01-01T16:16:16.123456]",
+ "[12, -718765, 0002-02-02T15:15:15,
0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123,
0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450,
0002-02-02T15:15:15.123456]",
+ "[13, -707413, 0033-03-03T14:14:14,
0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123,
0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450,
0033-03-03T14:14:14.123456]",
+ "[14, -557266, 0444-04-04T13:13:13,
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123,
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450,
0444-04-04T13:13:13.123456]",
+ "[15, -1, 1969-12-31T12:12:12,
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123,
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450,
1969-12-31T12:12:12.123456]",
+ "[16, 18261, 2019-12-31T23:11:11,
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123,
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450,
2019-12-31T23:11:11.123456]"));
+ }
+
+ private void runGenericAncientDateAndTimeTest(
+ UniqueDatabase database,
+ boolean enableTimeAdjuster,
+ List<String> expectedSnapshotResults,
+ List<String> expectedStreamingResults)
+ throws Exception {
+ Schema ancientSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("date_col", DataTypes.DATE(), null,
"0017-08-12")
+ .physicalColumn(
+ "datetime_0_col",
+ DataTypes.TIMESTAMP(0),
+ null,
+ "0016-07-13 17:17:17")
+ .physicalColumn(
+ "datetime_1_col",
+ DataTypes.TIMESTAMP(1),
+ null,
+ "0015-06-14 17:17:17.1")
+ .physicalColumn(
+ "datetime_2_col",
+ DataTypes.TIMESTAMP(2),
+ null,
+ "0014-05-15 17:17:17.12")
+ .physicalColumn(
+ "datetime_3_col",
+ DataTypes.TIMESTAMP(3),
+ null,
+ "0013-04-16 17:17:17.123")
+ .physicalColumn(
+ "datetime_4_col",
+ DataTypes.TIMESTAMP(4),
+ null,
+ "0012-03-17 17:17:17.1234")
+ .physicalColumn(
+ "datetime_5_col",
+ DataTypes.TIMESTAMP(5),
+ null,
+ "0011-02-18 17:17:17.12345")
+ .physicalColumn(
+ "datetime_6_col",
+ DataTypes.TIMESTAMP(6),
+ null,
+ "0010-01-19 17:17:17.123456")
+ .primaryKey("id")
+ .build();
+ List<RecordData.FieldGetter> ancientSchemaFieldGetters =
+ SchemaUtils.createFieldGetters(ancientSchema);
+ CreateTableEvent ancientCreateTableEvent =
+ new CreateTableEvent(
+ TableId.tableId(ancientDatabase.getDatabaseName(),
"ancient_times"),
+ ancientSchema);
+ try (CloseableIterator<Event> iterator =
+ env.fromSource(
+ getFlinkSourceProvider(
+ new String[] {"ancient_times"},
+ database,
+ enableTimeAdjuster)
+ .getSource(),
+ WatermarkStrategy.noWatermarks(),
+ "Event-Source")
+ .executeAndCollect()) {
+
+ {
+ Tuple2<List<Event>, List<CreateTableEvent>> snapshotResults =
+ fetchResultsAndCreateTableEvent(iterator,
expectedSnapshotResults.size());
+
Assertions.assertThat(snapshotResults.f1).isSubsetOf(ancientCreateTableEvent);
+ Assertions.assertThat(snapshotResults.f0)
+ .map(evt -> (DataChangeEvent) evt)
+ .map(
+ evt ->
+ SchemaUtils.restoreOriginalData(
+ evt.after(),
ancientSchemaFieldGetters)
+ .toString())
+
.containsExactlyInAnyOrderElementsOf(expectedSnapshotResults);
+ }
+
+ createBinlogEvents(ancientDatabase);
+
+ {
+ Tuple2<List<Event>, List<CreateTableEvent>> streamingResults =
+ fetchResultsAndCreateTableEvent(iterator,
expectedSnapshotResults.size());
+
Assertions.assertThat(streamingResults.f1).isSubsetOf(ancientCreateTableEvent);
+ Assertions.assertThat(streamingResults.f0)
+ .map(evt -> (DataChangeEvent) evt)
+ .map(
+ evt ->
+ SchemaUtils.restoreOriginalData(
+ evt.after(),
ancientSchemaFieldGetters)
+ .toString())
+
.containsExactlyInAnyOrderElementsOf(expectedStreamingResults);
+ }
+ }
+ }
+
+ private FlinkSourceProvider getFlinkSourceProvider(
+ String[] captureTables, UniqueDatabase database, boolean
enableTimeAdjuster) {
+ String[] captureTableIds =
+ Arrays.stream(captureTables)
+ .map(tableName -> database.getDatabaseName() + "." +
tableName)
+ .toArray(String[]::new);
+
+ Properties dbzProperties = new Properties();
+ dbzProperties.put("enable.time.adjuster",
String.valueOf(enableTimeAdjuster));
+
+ MySqlSourceConfigFactory configFactory =
+ new MySqlSourceConfigFactory()
+ .startupOptions(StartupOptions.initial())
+ .databaseList(database.getDatabaseName())
+ .tableList(captureTableIds)
+ .includeSchemaChanges(false)
+ .hostname(database.getHost())
+ .port(database.getDatabasePort())
+ .splitSize(10)
+ .fetchSize(2)
+ .username(database.getUsername())
+ .password(database.getPassword())
+ .serverTimeZone(ZoneId.of("UTC").toString())
+
.serverId(MySqSourceTestUtils.getServerId(env.getParallelism()))
+ .debeziumProperties(dbzProperties);
+ return (FlinkSourceProvider) new
MySqlDataSource(configFactory).getEventSourceProvider();
+ }
+
+ private static void createBinlogEvents(UniqueDatabase database) throws
SQLException {
+ // Test reading identical data in binlog stage again
+ try (Connection connection = database.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " DEFAULT,\n"
+ + " DEFAULT,\n"
+ + " DEFAULT,\n"
+ + " DEFAULT,\n"
+ + " DEFAULT,\n"
+ + " DEFAULT,\n"
+ + " DEFAULT,\n"
+ + " DEFAULT\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '0000-00-00',\n"
+ + " '0000-00-00 00:00:00',\n"
+ + " '0000-00-00 00:00:00.0',\n"
+ + " '0000-00-00 00:00:00.00',\n"
+ + " '0000-00-00 00:00:00.000',\n"
+ + " '0000-00-00 00:00:00.0000',\n"
+ + " '0000-00-00 00:00:00.00000',\n"
+ + " '0000-00-00 00:00:00.000000'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '0001-01-01',\n"
+ + " '0001-01-01 16:16:16',\n"
+ + " '0001-01-01 16:16:16.1',\n"
+ + " '0001-01-01 16:16:16.12',\n"
+ + " '0001-01-01 16:16:16.123',\n"
+ + " '0001-01-01 16:16:16.1234',\n"
+ + " '0001-01-01 16:16:16.12345',\n"
+ + " '0001-01-01 16:16:16.123456'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '0002-02-02',\n"
+ + " '0002-02-02 15:15:15',\n"
+ + " '0002-02-02 15:15:15.1',\n"
+ + " '0002-02-02 15:15:15.12',\n"
+ + " '0002-02-02 15:15:15.123',\n"
+ + " '0002-02-02 15:15:15.1234',\n"
+ + " '0002-02-02 15:15:15.12345',\n"
+ + " '0002-02-02 15:15:15.123456'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '0033-03-03',\n"
+ + " '0033-03-03 14:14:14',\n"
+ + " '0033-03-03 14:14:14.1',\n"
+ + " '0033-03-03 14:14:14.12',\n"
+ + " '0033-03-03 14:14:14.123',\n"
+ + " '0033-03-03 14:14:14.1234',\n"
+ + " '0033-03-03 14:14:14.12345',\n"
+ + " '0033-03-03 14:14:14.123456'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '0444-04-04',\n"
+ + " '0444-04-04 13:13:13',\n"
+ + " '0444-04-04 13:13:13.1',\n"
+ + " '0444-04-04 13:13:13.12',\n"
+ + " '0444-04-04 13:13:13.123',\n"
+ + " '0444-04-04 13:13:13.1234',\n"
+ + " '0444-04-04 13:13:13.12345',\n"
+ + " '0444-04-04 13:13:13.123456'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '1969-12-31',\n"
+ + " '1969-12-31 12:12:12',\n"
+ + " '1969-12-31 12:12:12.1',\n"
+ + " '1969-12-31 12:12:12.12',\n"
+ + " '1969-12-31 12:12:12.123',\n"
+ + " '1969-12-31 12:12:12.1234',\n"
+ + " '1969-12-31 12:12:12.12345',\n"
+ + " '1969-12-31 12:12:12.123456'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '2019-12-31',\n"
+ + " '2019-12-31 23:11:11',\n"
+ + " '2019-12-31 23:11:11.1',\n"
+ + " '2019-12-31 23:11:11.12',\n"
+ + " '2019-12-31 23:11:11.123',\n"
+ + " '2019-12-31 23:11:11.1234',\n"
+ + " '2019-12-31 23:11:11.12345',\n"
+ + " '2019-12-31 23:11:11.123456'\n"
+ + ");");
+ }
+ }
+
+ public static <T> Tuple2<List<T>, List<CreateTableEvent>>
fetchResultsAndCreateTableEvent(
+ Iterator<T> iter, int size) {
+ List<T> result = new ArrayList<>(size);
+ List<CreateTableEvent> createTableEvents = new ArrayList<>();
+ while (size > 0 && iter.hasNext()) {
+ T event = iter.next();
+ if (event instanceof CreateTableEvent) {
+ createTableEvents.add((CreateTableEvent) event);
+ } else {
+ result.add(event);
+ size--;
+ }
+ }
+ return Tuple2.of(result, createTableEvents);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/ancient_date_and_time.sql
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/ancient_date_and_time.sql
new file mode 100644
index 000000000..93832b9b7
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/ancient_date_and_time.sql
@@ -0,0 +1,124 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE ancient_times
+(
+ id INT NOT NULL AUTO_INCREMENT,
+ date_col DATE DEFAULT '0017-08-12',
+ datetime_0_col DATETIME(0) DEFAULT '0016-07-13 17:17:17',
+ datetime_1_col DATETIME(1) DEFAULT '0015-06-14 17:17:17.1',
+ datetime_2_col DATETIME(2) DEFAULT '0014-05-15 17:17:17.12',
+ datetime_3_col DATETIME(3) DEFAULT '0013-04-16 17:17:17.123',
+ datetime_4_col DATETIME(4) DEFAULT '0012-03-17 17:17:17.1234',
+ datetime_5_col DATETIME(5) DEFAULT '0011-02-18 17:17:17.12345',
+ datetime_6_col DATETIME(6) DEFAULT '0010-01-19 17:17:17.123456',
+ PRIMARY KEY (id)
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ DEFAULT,
+ DEFAULT,
+ DEFAULT,
+ DEFAULT,
+ DEFAULT,
+ DEFAULT,
+ DEFAULT,
+ DEFAULT
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '0000-00-00',
+ '0000-00-00 00:00:00',
+ '0000-00-00 00:00:00.0',
+ '0000-00-00 00:00:00.00',
+ '0000-00-00 00:00:00.000',
+ '0000-00-00 00:00:00.0000',
+ '0000-00-00 00:00:00.00000',
+ '0000-00-00 00:00:00.000000'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '0001-01-01',
+ '0001-01-01 16:16:16',
+ '0001-01-01 16:16:16.1',
+ '0001-01-01 16:16:16.12',
+ '0001-01-01 16:16:16.123',
+ '0001-01-01 16:16:16.1234',
+ '0001-01-01 16:16:16.12345',
+ '0001-01-01 16:16:16.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '0002-02-02',
+ '0002-02-02 15:15:15',
+ '0002-02-02 15:15:15.1',
+ '0002-02-02 15:15:15.12',
+ '0002-02-02 15:15:15.123',
+ '0002-02-02 15:15:15.1234',
+ '0002-02-02 15:15:15.12345',
+ '0002-02-02 15:15:15.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '0033-03-03',
+ '0033-03-03 14:14:14',
+ '0033-03-03 14:14:14.1',
+ '0033-03-03 14:14:14.12',
+ '0033-03-03 14:14:14.123',
+ '0033-03-03 14:14:14.1234',
+ '0033-03-03 14:14:14.12345',
+ '0033-03-03 14:14:14.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '0444-04-04',
+ '0444-04-04 13:13:13',
+ '0444-04-04 13:13:13.1',
+ '0444-04-04 13:13:13.12',
+ '0444-04-04 13:13:13.123',
+ '0444-04-04 13:13:13.1234',
+ '0444-04-04 13:13:13.12345',
+ '0444-04-04 13:13:13.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '1969-12-31',
+ '1969-12-31 12:12:12',
+ '1969-12-31 12:12:12.1',
+ '1969-12-31 12:12:12.12',
+ '1969-12-31 12:12:12.123',
+ '1969-12-31 12:12:12.1234',
+ '1969-12-31 12:12:12.12345',
+ '1969-12-31 12:12:12.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '2019-12-31',
+ '2019-12-31 23:11:11',
+ '2019-12-31 23:11:11.1',
+ '2019-12-31 23:11:11.12',
+ '2019-12-31 23:11:11.123',
+ '2019-12-31 23:11:11.1234',
+ '2019-12-31 23:11:11.12345',
+ '2019-12-31 23:11:11.123456'
+);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-allow-ancient-date-time/my.cnf
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-allow-ancient-date-time/my.cnf
new file mode 100644
index 000000000..ca0483780
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-allow-ancient-date-time/my.cnf
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but
would
+# be longer on a production system. Row-level info is required for ingest to
work.
+# Server ID is required, but this will vary on production systems
+server-id = 223344
+log_bin = mysql-bin
+expire_logs_days = 1
+binlog_format = row
+sql_mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION"
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
index f77d1aa18..ad7ec458e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
@@ -309,10 +309,12 @@ public abstract class DebeziumEventDeserializationSchema
extends SourceRecordEve
return TimestampData.fromMillis((Long) dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
- return TimestampData.fromMillis(micro / 1000, (int) (micro
% 1000 * 1000));
+ return TimestampData.fromMillis(
+ Math.floorDiv(micro, 1000), (int)
(Math.floorMod(micro, 1000) * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
- return TimestampData.fromMillis(nano / 1000_000, (int)
(nano % 1000_000));
+ return TimestampData.fromMillis(
+ Math.floorDiv(nano, 1000_000), (int)
(Math.floorMod(nano, 1000_000)));
}
}
throw new IllegalArgumentException(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
index 6a9642075..b68dcd171 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -481,12 +481,17 @@ public final class RowDataDebeziumDeserializeSchema
return TimestampData.fromEpochMillis((Long)
dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
+ // Use Math#floorDiv and Math#floorMod instead of
`/` and `%`, because
+ // timestamp number could be negative if we're
handling timestamps prior
+ // to 1970.
return TimestampData.fromEpochMillis(
- micro / 1000, (int) (micro % 1000 * 1000));
+ Math.floorDiv(micro, 1000),
+ (int) (Math.floorMod(micro, 1000) * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromEpochMillis(
- nano / 1000_000, (int) (nano % 1000_000));
+ Math.floorDiv(nano, 1000_000),
+ (int) (Math.floorMod(nano, 1000_000)));
}
}
LocalDateTime localDateTime =
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java
new file mode 100644
index 000000000..ca1a91b4e
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.RowUtils;
+import org.apache.flink.util.CloseableIterator;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.lifecycle.Startables;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Integration tests for MySQL Table source to handle ancient date and time
records. */
+public class MySqlAncientDateAndTimeITCase extends MySqlSourceTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlAncientDateAndTimeITCase.class);
+
+ private static final String TEST_USER = "mysqluser";
+ private static final String TEST_PASSWORD = "mysqlpw";
+
+ // We need an extra "no_zero_in_date = false" config to insert malformed
date and time records.
+ private static final MySqlContainer MYSQL_CONTAINER =
+ createMySqlContainer(MySqlVersion.V8_0,
"docker/server-allow-ancient-date-time/my.cnf");
+
+ private final UniqueDatabase ancientDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, "ancient_date_and_time",
TEST_USER, TEST_PASSWORD);
+
+ private final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ @BeforeClass
+ public static void beforeClass() {
+ LOG.info("Starting MySql container...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Container MySql is started.");
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ LOG.info("Stopping MySql containers...");
+ MYSQL_CONTAINER.stop();
+ LOG.info("Container MySql is stopped.");
+ }
+
+ @Before
+ public void before() {
+ TestValuesTableFactory.clearAllData();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(200);
+ ancientDatabase.createAndInitialize();
+ }
+
+ @After
+ public void after() {
+ ancientDatabase.dropDatabase();
+ }
+
+ /**
+ * With the TimeAdjuster in Debezium, all date / time records between year
0001 and 0099 will be
+ * shifted to 1971 ~ 2069.
+ */
+ @Test
+ public void
testAncientDateAndTimeWithTimeAdjusterWithRowDataDeserializer() throws
Exception {
+ // LocalDate.ofEpochDay reference:
+ //
+---------------------------------------------------------------------------------+
+ // | 17390 | 11323 | 11720 | 23072 | -557266 | -1
| 18261 |
+ // | 2017/8/12 | 2001/1/1 | 2002/2/2 | 2033/3/3 | 0444/4/4 |
1969/12/31 | 2019/12/31 |
+ //
+---------------------------------------------------------------------------------+
+ runGenericAncientDateAndTimeTest(
+ MYSQL_CONTAINER,
+ ancientDatabase,
+ true,
+ DeserializerType.ROW_DATA,
+ Arrays.asList(
+ "+I[1, 17390, 2016-07-13T17:17:17,
2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123,
2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450,
2010-01-19T17:17:17.123456]",
+ "+I[2, null, null, null, null, null, null, null,
null]",
+ "+I[3, 11323, 2001-01-01T16:16:16,
2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123,
2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450,
2001-01-01T16:16:16.123456]",
+ "+I[4, 11720, 2002-02-02T15:15:15,
2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123,
2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450,
2002-02-02T15:15:15.123456]",
+ "+I[5, 23072, 2033-03-03T14:14:14,
2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123,
2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450,
2033-03-03T14:14:14.123456]",
+ "+I[6, -557266, 0444-04-04T13:13:13,
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123,
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450,
0444-04-04T13:13:13.123456]",
+ "+I[7, -1, 1969-12-31T12:12:12,
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123,
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450,
1969-12-31T12:12:12.123456]",
+ "+I[8, 18261, 2019-12-31T23:11:11,
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123,
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450,
2019-12-31T23:11:11.123456]"));
+ }
+
+ @Test
+ public void
testAncientDateAndTimeWithoutTimeAdjusterWithRowDataDeserializer()
+ throws Exception {
+ // LocalDate.ofEpochDay reference:
+ //
+---------------------------------------------------------------------------------+
+ // | -713095 | -719162 | -718765 | -707413 | -557266 | -1
| 18261 |
+ // | 0017/8/12 | 0001/1/1 | 0002/2/2 | 0033/3/3 | 0444/4/4 |
1969/12/31 | 2019/12/31 |
+ //
+---------------------------------------------------------------------------------+
+ runGenericAncientDateAndTimeTest(
+ MYSQL_CONTAINER,
+ ancientDatabase,
+ false,
+ DeserializerType.ROW_DATA,
+ Arrays.asList(
+ "+I[1, -713095, 0016-07-13T17:17:17,
0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123,
0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450,
0010-01-19T17:17:17.123456]",
+ "+I[2, null, null, null, null, null, null, null,
null]",
+ "+I[3, -719162, 0001-01-01T16:16:16,
0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123,
0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450,
0001-01-01T16:16:16.123456]",
+ "+I[4, -718765, 0002-02-02T15:15:15,
0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123,
0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450,
0002-02-02T15:15:15.123456]",
+ "+I[5, -707413, 0033-03-03T14:14:14,
0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123,
0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450,
0033-03-03T14:14:14.123456]",
+ "+I[6, -557266, 0444-04-04T13:13:13,
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123,
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450,
0444-04-04T13:13:13.123456]",
+ "+I[7, -1, 1969-12-31T12:12:12,
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123,
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450,
1969-12-31T12:12:12.123456]",
+ "+I[8, 18261, 2019-12-31T23:11:11,
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123,
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450,
2019-12-31T23:11:11.123456]"));
+ }
+
+ @Test
+ public void testAncientDateAndTimeWithTimeAdjusterWithJsonDeserializer()
throws Exception {
+ // LocalDate.ofEpochDay reference:
+ //
+ //
+---------------------------------------------------------------------------------+
+ // | 17390 | 11323 | 11720 | 23072 | -557266 | -1
| 18261 |
+ // | 2017/8/12 | 2001/1/1 | 2002/2/2 | 2033/3/3 | 0444/4/4 |
1969/12/31 | 2019/12/31 |
+ //
+---------------------------------------------------------------------------------+
+ //
+ // LocalDateTime.ofEpochSecond reference:
+ //
+ // Row 1:
+ // 1468430237000 -> 2016-07-13T17:17:17
+ // 1434302237100 -> 2015-06-14T17:17:17.100
+ // 1400174237120 -> 2014-05-15T17:17:17.120
+ // 1366132637123 -> 2013-04-16T17:17:17.123
+ // 1332004637123400 -> 2012-03-17T17:17:17.123400
+ // 1298049437123450 -> 2011-02-18T17:17:17.123450
+ // 1263921437123456 -> 2010-01-19T17:17:17.123456
+ //
+ // Row 2:
+ // (null)
+ //
+ // Row 3:
+ // 978365776000 -> 2001-01-01T16:16:16
+ // 978365776100 -> 2001-01-01T16:16:16.100
+ // 978365776120 -> 2001-01-01T16:16:16.120
+ // 978365776123 -> 2001-01-01T16:16:16.123
+ // 978365776123400 -> 2001-01-01T16:16:16.123400
+ // 978365776123450 -> 2001-01-01T16:16:16.123450
+ // 978365776123456 -> 2001-01-01T16:16:16.123456
+ //
+ // Row 4:
+ // 1012662915000 -> 2002-02-02T15:15:15
+ // 1012662915100 -> 2002-02-02T15:15:15.100
+ // 1012662915120 -> 2002-02-02T15:15:15.120
+ // 1012662915123 -> 2002-02-02T15:15:15.123
+ // 1012662915123400 -> 2002-02-02T15:15:15.123400
+ // 1012662915123450 -> 2002-02-02T15:15:15.123450
+ // 1012662915123456 -> 2002-02-02T15:15:15.123456
+ //
+ // Row 5:
+ // 1993472054000 -> 2033-03-03T14:14:14
+ // 1993472054100 -> 2033-03-03T14:14:14.100
+ // 1993472054120 -> 2033-03-03T14:14:14.120
+ // 1993472054123 -> 2033-03-03T14:14:14.123
+ // 1993472054123400 -> 2033-03-03T14:14:14.123400
+ // 1993472054123450 -> 2033-03-03T14:14:14.123450
+ // 1993472054123456 -> 2033-03-03T14:14:14.123456
+ //
+ // Row 6:
+ // -48147734807000 -> 0444-04-04T13:13:13
+ // -48147734806900 -> 0444-04-04T13:13:13.100
+ // -48147734806880 -> 0444-04-04T13:13:13.120
+ // -48147734806877 -> 0444-04-04T13:13:13.123
+ // -48147734806876600 -> 0444-04-04T13:13:13.000123400
+ // -48147734806876550 -> 0444-04-04T13:13:13.000123450
+ // -48147734806876544 -> 0444-04-04T13:13:13.000123456
+ //
+ // Row 7:
+ // -42468000 -> 1969-12-31T12:12:12
+ // -42467900 -> 1969-12-31T12:12:12.100
+ // -42467880 -> 1969-12-31T12:12:12.120
+ // -42467877 -> 1969-12-31T12:12:12.123
+ // -42467876600 -> 1969-12-31T12:12:12.123400
+ // -42467876550 -> 1969-12-31T12:12:12.123450
+ // -42467876544 -> 1969-12-31T12:12:12.123456
+ //
+ // Row 8:
+ // 1577833871000 -> 2019-12-31T23:11:11
+ // 1577833871100 -> 2019-12-31T23:11:11.100
+ // 1577833871120 -> 2019-12-31T23:11:11.120
+ // 1577833871123 -> 2019-12-31T23:11:11.123
+ // 1577833871123400 -> 2019-12-31T23:11:11.123400
+ // 1577833871123450 -> 2019-12-31T23:11:11.123450
+ // 1577833871123456 -> 2019-12-31T23:11:11.123456
+
+ runGenericAncientDateAndTimeTest(
+ MYSQL_CONTAINER,
+ ancientDatabase,
+ true,
+ DeserializerType.JSON,
+ Arrays.asList(
+
"{\"id\":\"AQ==\",\"date_col\":17390,\"datetime_0_col\":1468430237000,\"datetime_1_col\":1434302237100,\"datetime_2_col\":1400174237120,\"datetime_3_col\":1366132637123,\"datetime_4_col\":1332004637123400,\"datetime_5_col\":1298049437123450,\"datetime_6_col\":1263921437123456}",
+
"{\"id\":\"Ag==\",\"date_col\":null,\"datetime_0_col\":null,\"datetime_1_col\":null,\"datetime_2_col\":null,\"datetime_3_col\":null,\"datetime_4_col\":null,\"datetime_5_col\":null,\"datetime_6_col\":null}",
+
"{\"id\":\"Aw==\",\"date_col\":11323,\"datetime_0_col\":978365776000,\"datetime_1_col\":978365776100,\"datetime_2_col\":978365776120,\"datetime_3_col\":978365776123,\"datetime_4_col\":978365776123400,\"datetime_5_col\":978365776123450,\"datetime_6_col\":978365776123456}",
+
"{\"id\":\"BA==\",\"date_col\":11720,\"datetime_0_col\":1012662915000,\"datetime_1_col\":1012662915100,\"datetime_2_col\":1012662915120,\"datetime_3_col\":1012662915123,\"datetime_4_col\":1012662915123400,\"datetime_5_col\":1012662915123450,\"datetime_6_col\":1012662915123456}",
+
"{\"id\":\"BQ==\",\"date_col\":23072,\"datetime_0_col\":1993472054000,\"datetime_1_col\":1993472054100,\"datetime_2_col\":1993472054120,\"datetime_3_col\":1993472054123,\"datetime_4_col\":1993472054123400,\"datetime_5_col\":1993472054123450,\"datetime_6_col\":1993472054123456}",
+
"{\"id\":\"Bg==\",\"date_col\":-557266,\"datetime_0_col\":-48147734807000,\"datetime_1_col\":-48147734806900,\"datetime_2_col\":-48147734806880,\"datetime_3_col\":-48147734806877,\"datetime_4_col\":-48147734806876600,\"datetime_5_col\":-48147734806876550,\"datetime_6_col\":-48147734806876544}",
+
"{\"id\":\"Bw==\",\"date_col\":-1,\"datetime_0_col\":-42468000,\"datetime_1_col\":-42467900,\"datetime_2_col\":-42467880,\"datetime_3_col\":-42467877,\"datetime_4_col\":-42467876600,\"datetime_5_col\":-42467876550,\"datetime_6_col\":-42467876544}",
+
"{\"id\":\"CA==\",\"date_col\":18261,\"datetime_0_col\":1577833871000,\"datetime_1_col\":1577833871100,\"datetime_2_col\":1577833871120,\"datetime_3_col\":1577833871123,\"datetime_4_col\":1577833871123400,\"datetime_5_col\":1577833871123450,\"datetime_6_col\":1577833871123456}"));
+ }
+
+ @Test
+ public void
testAncientDateAndTimeWithoutTimeAdjusterWithJsonDeserializer() throws
Exception {
+ // LocalDate.ofEpochDay reference:
+ //
+ //
+---------------------------------------------------------------------------------+
+ // | -713095 | -719162 | -718765 | -707413 | -557266 | -1
| 18261 |
+ // | 0017/8/12 | 0001/1/1 | 0002/2/2 | 0033/3/3 | 0444/4/4 |
1969/12/31 | 2019/12/31 |
+ //
+---------------------------------------------------------------------------------+
+ //
+ // LocalDateTime.ofEpochSecond reference:
+ //
+ // Row 1:
+ // -61645473763000 -> 0016-07-13T17:17:17
+ // -61679601762900 -> 0015-06-14T17:17:17.100
+ // -61713729762880 -> 0014-05-15T17:17:17.120
+ // -61747771362877 -> 0013-04-16T17:17:17.123
+ // -61781899362876600 -> 0012-03-17T17:17:17.123400
+ // -61815854562876550 -> 0011-02-18T17:17:17.123450
+ // -61849982562876544 -> 0010-01-19T17:17:17.123456
+ //
+ // Row 2:
+ // (null)
+ //
+ // Row 3:
+ // -62135538224000 -> 0001-01-01T16:16:16
+ // -62135538223900 -> 0001-01-01T16:16:16.100
+ // -62135538223880 -> 0001-01-01T16:16:16.120
+ // -62135538223877 -> 0001-01-01T16:16:16.123
+ // -62135538223876600 -> 0001-01-01T16:16:16.123400
+ // -62135538223876550 -> 0001-01-01T16:16:16.123450
+ // -62135538223876544 -> 0001-01-01T16:16:16.123456
+ //
+ // Row 4:
+ // -62101241085000 -> 0002-02-02T15:15:15
+ // -62101241084900 -> 0002-02-02T15:15:15.100
+ // -62101241084880 -> 0002-02-02T15:15:15.120
+ // -62101241084877 -> 0002-02-02T15:15:15.123
+ // -62101241084876600 -> 0002-02-02T15:15:15.123400
+ // -62101241084876550 -> 0002-02-02T15:15:15.123450
+ // -62101241084876544 -> 0002-02-02T15:15:15.123456
+ //
+ // Row 5:
+ // -61120431946000 -> 0033-03-03T14:14:14
+ // -61120431945900 -> 0033-03-03T14:14:14.100
+ // -61120431945880 -> 0033-03-03T14:14:14.120
+ // -61120431945877 -> 0033-03-03T14:14:14.123
+ // -61120431945876600 -> 0033-03-03T14:14:14.123400
+ // -61120431945876550 -> 0033-03-03T14:14:14.123450
+ // -61120431945876544 -> 0033-03-03T14:14:14.123456
+ //
+ //
+ // Row 6:
+ // -48147734807000 -> 0444-04-04T13:13:13
+ // -48147734806900 -> 0444-04-04T13:13:13.100
+ // -48147734806880 -> 0444-04-04T13:13:13.120
+ // -48147734806877 -> 0444-04-04T13:13:13.123
+ // -48147734806876600 -> 0444-04-04T13:13:13.000123400
+ // -48147734806876550 -> 0444-04-04T13:13:13.000123450
+ // -48147734806876544 -> 0444-04-04T13:13:13.000123456
+ //
+ // Row 7:
+ // -42468000 -> 1969-12-31T12:12:12
+ // -42467900 -> 1969-12-31T12:12:12.100
+ // -42467880 -> 1969-12-31T12:12:12.120
+ // -42467877 -> 1969-12-31T12:12:12.123
+ // -42467876600 -> 1969-12-31T12:12:12.123400
+ // -42467876550 -> 1969-12-31T12:12:12.123450
+ // -42467876544 -> 1969-12-31T12:12:12.123456
+ //
+ // Row 8:
+ // 1577833871000 -> 2019-12-31T23:11:11
+ // 1577833871100 -> 2019-12-31T23:11:11.100
+ // 1577833871120 -> 2019-12-31T23:11:11.120
+ // 1577833871123 -> 2019-12-31T23:11:11.123
+ // 1577833871123400 -> 2019-12-31T23:11:11.123400
+ // 1577833871123450 -> 2019-12-31T23:11:11.123450
+ // 1577833871123456 -> 2019-12-31T23:11:11.123456
+
+ runGenericAncientDateAndTimeTest(
+ MYSQL_CONTAINER,
+ ancientDatabase,
+ false,
+ DeserializerType.JSON,
+ Arrays.asList(
+
"{\"id\":\"AQ==\",\"date_col\":-713095,\"datetime_0_col\":-61645473763000,\"datetime_1_col\":-61679601762900,\"datetime_2_col\":-61713729762880,\"datetime_3_col\":-61747771362877,\"datetime_4_col\":-61781899362876600,\"datetime_5_col\":-61815854562876550,\"datetime_6_col\":-61849982562876544}",
+
"{\"id\":\"Ag==\",\"date_col\":null,\"datetime_0_col\":null,\"datetime_1_col\":null,\"datetime_2_col\":null,\"datetime_3_col\":null,\"datetime_4_col\":null,\"datetime_5_col\":null,\"datetime_6_col\":null}",
+
"{\"id\":\"Aw==\",\"date_col\":-719162,\"datetime_0_col\":-62135538224000,\"datetime_1_col\":-62135538223900,\"datetime_2_col\":-62135538223880,\"datetime_3_col\":-62135538223877,\"datetime_4_col\":-62135538223876600,\"datetime_5_col\":-62135538223876550,\"datetime_6_col\":-62135538223876544}",
+
"{\"id\":\"BA==\",\"date_col\":-718765,\"datetime_0_col\":-62101241085000,\"datetime_1_col\":-62101241084900,\"datetime_2_col\":-62101241084880,\"datetime_3_col\":-62101241084877,\"datetime_4_col\":-62101241084876600,\"datetime_5_col\":-62101241084876550,\"datetime_6_col\":-62101241084876544}",
+
"{\"id\":\"BQ==\",\"date_col\":-707413,\"datetime_0_col\":-61120431946000,\"datetime_1_col\":-61120431945900,\"datetime_2_col\":-61120431945880,\"datetime_3_col\":-61120431945877,\"datetime_4_col\":-61120431945876600,\"datetime_5_col\":-61120431945876550,\"datetime_6_col\":-61120431945876544}",
+
"{\"id\":\"Bg==\",\"date_col\":-557266,\"datetime_0_col\":-48147734807000,\"datetime_1_col\":-48147734806900,\"datetime_2_col\":-48147734806880,\"datetime_3_col\":-48147734806877,\"datetime_4_col\":-48147734806876600,\"datetime_5_col\":-48147734806876550,\"datetime_6_col\":-48147734806876544}",
+
"{\"id\":\"Bw==\",\"date_col\":-1,\"datetime_0_col\":-42468000,\"datetime_1_col\":-42467900,\"datetime_2_col\":-42467880,\"datetime_3_col\":-42467877,\"datetime_4_col\":-42467876600,\"datetime_5_col\":-42467876550,\"datetime_6_col\":-42467876544}",
+
"{\"id\":\"CA==\",\"date_col\":18261,\"datetime_0_col\":1577833871000,\"datetime_1_col\":1577833871100,\"datetime_2_col\":1577833871120,\"datetime_3_col\":1577833871123,\"datetime_4_col\":1577833871123400,\"datetime_5_col\":1577833871123450,\"datetime_6_col\":1577833871123456}"));
+ }
+
+ private void runGenericAncientDateAndTimeTest(
+ MySqlContainer container,
+ UniqueDatabase database,
+ boolean enableTimeAdjuster,
+ DeserializerType deserializerType,
+ List<String> expectedResults)
+ throws Exception {
+
+ switch (deserializerType) {
+ case ROW_DATA:
+ {
+ // Build deserializer
+ DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT()),
+ DataTypes.FIELD("date_col",
DataTypes.DATE()),
+ DataTypes.FIELD("datetime_0_col",
DataTypes.TIMESTAMP(0)),
+ DataTypes.FIELD("datetime_1_col",
DataTypes.TIMESTAMP(1)),
+ DataTypes.FIELD("datetime_2_col",
DataTypes.TIMESTAMP(2)),
+ DataTypes.FIELD("datetime_3_col",
DataTypes.TIMESTAMP(3)),
+ DataTypes.FIELD("datetime_4_col",
DataTypes.TIMESTAMP(4)),
+ DataTypes.FIELD("datetime_5_col",
DataTypes.TIMESTAMP(5)),
+ DataTypes.FIELD("datetime_6_col",
DataTypes.TIMESTAMP(6)));
+ LogicalType logicalType =
TypeConversions.fromDataToLogicalType(dataType);
+ InternalTypeInfo<RowData> typeInfo =
InternalTypeInfo.of(logicalType);
+ RowDataDebeziumDeserializeSchema deserializer =
+ RowDataDebeziumDeserializeSchema.newBuilder()
+ .setPhysicalRowType((RowType)
dataType.getLogicalType())
+ .setResultTypeInfo(typeInfo)
+ .build();
+
+ Properties dbzProperties = new Properties();
+ dbzProperties.put("enable.time.adjuster",
String.valueOf(enableTimeAdjuster));
+ // Build source
+ MySqlSource<RowData> mySqlSource =
+ MySqlSource.<RowData>builder()
+ .hostname(container.getHost())
+ .port(container.getDatabasePort())
+ .databaseList(database.getDatabaseName())
+ .serverTimeZone("UTC")
+ .tableList(database.getDatabaseName() +
".ancient_times")
+ .username(database.getUsername())
+ .password(database.getPassword())
+ .serverId(getServerId())
+ .deserializer(deserializer)
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(dbzProperties)
+ .build();
+
+ try (CloseableIterator<RowData> iterator =
+ env.fromSource(
+ mySqlSource,
+ WatermarkStrategy.noWatermarks(),
+ "Fetch results")
+ .executeAndCollect()) {
+ List<RowData> results = fetchRows(iterator,
expectedResults.size());
+
Assertions.assertThat(convertRowDataToRowString(results))
+
.containsExactlyInAnyOrderElementsOf(expectedResults);
+ }
+ }
+ break;
+ case JSON:
+ {
+ JsonDebeziumDeserializationSchema deserializer =
+ new JsonDebeziumDeserializationSchema();
+
+ Properties dbzProperties = new Properties();
+ dbzProperties.put("enable.time.adjuster",
String.valueOf(enableTimeAdjuster));
+ // Build source
+ MySqlSource<String> mySqlSource =
+ MySqlSource.<String>builder()
+ .hostname(container.getHost())
+ .port(container.getDatabasePort())
+ .databaseList(database.getDatabaseName())
+ .serverTimeZone("UTC")
+ .tableList(database.getDatabaseName() +
".ancient_times")
+ .username(database.getUsername())
+ .password(database.getPassword())
+ .serverId(getServerId())
+ .deserializer(deserializer)
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(dbzProperties)
+ .build();
+
+ try (CloseableIterator<String> iterator =
+ env.fromSource(
+ mySqlSource,
+ WatermarkStrategy.noWatermarks(),
+ "Fetch results")
+ .executeAndCollect()) {
+ List<String> results = fetchRows(iterator,
expectedResults.size());
+ Assertions.assertThat(convertJsonToRowString(results))
+
.containsExactlyInAnyOrderElementsOf(expectedResults);
+ }
+ }
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unknown deserializer type: " + deserializerType);
+ }
+ }
+
+ private static <T> List<T> fetchRows(Iterator<T> iter, int size) {
+ List<T> rows = new ArrayList<>(size);
+ while (size > 0 && iter.hasNext()) {
+ T row = iter.next();
+ rows.add(row);
+ size--;
+ }
+ return rows;
+ }
+
+ private static List<String> convertRowDataToRowString(List<RowData> rows) {
+ LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
+ map.put("id_col", 0);
+ map.put("date_col", 1);
+ map.put("datetime_0_col", 2);
+ map.put("datetime_1_col", 3);
+ map.put("datetime_2_col", 4);
+ map.put("datetime_3_col", 5);
+ map.put("datetime_4_col", 6);
+ map.put("datetime_5_col", 7);
+ map.put("datetime_6_col", 8);
+ return rows.stream()
+ .map(
+ row ->
+ RowUtils.createRowWithNamedPositions(
+ row.getRowKind(),
+ new Object[] {
+ wrap(row, 0,
RowData::getInt),
+ wrap(row, 1,
RowData::getInt),
+ wrap(row, 2, (r, i) ->
r.getTimestamp(i, 0)),
+ wrap(row, 3, (r, i) ->
r.getTimestamp(i, 1)),
+ wrap(row, 4, (r, i) ->
r.getTimestamp(i, 2)),
+ wrap(row, 5, (r, i) ->
r.getTimestamp(i, 3)),
+ wrap(row, 6, (r, i) ->
r.getTimestamp(i, 4)),
+ wrap(row, 7, (r, i) ->
r.getTimestamp(i, 5)),
+ wrap(row, 8, (r, i) ->
r.getTimestamp(i, 6))
+ },
+ map)
+ .toString())
+ .collect(Collectors.toList());
+ }
+
+ private static List<String> convertJsonToRowString(List<String> rows) {
+ ObjectMapper mapper = new ObjectMapper();
+ return rows.stream()
+ .map(
+ row -> {
+ try {
+ JsonNode node = mapper.readTree(row);
+ return node.get("after").toString();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ private static <T> Object wrap(RowData row, int index, BiFunction<RowData,
Integer, T> getter) {
+ if (row.isNullAt(index)) {
+ return null;
+ }
+ return getter.apply(row, index);
+ }
+
+ private String getServerId() {
+ final Random random = new Random();
+ int serverId = random.nextInt(100) + 5400;
+ return serverId + "-" + (serverId + env.getParallelism());
+ }
+
+ enum DeserializerType {
+ JSON,
+ ROW_DATA
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java
new file mode 100644
index 000000000..b9ceaa783
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.table;
+
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Stream;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+
+/** Integration tests for MySQL Table source to handle ancient date and time
records. */
+@RunWith(Parameterized.class)
+public class MySqlAncientDateAndTimeITCase extends MySqlSourceTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlAncientDateAndTimeITCase.class);
+
+ private static final String TEST_USER = "mysqluser";
+ private static final String TEST_PASSWORD = "mysqlpw";
+
+ // We need an extra "no_zero_in_date = false" config to insert malformed
date and time records.
+ private static final MySqlContainer MYSQL_CONTAINER =
+ createMySqlContainer(MySqlVersion.V8_0,
"docker/server-allow-ancient-date-time/my.cnf");
+
+ private final UniqueDatabase ancientDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, "ancient_date_and_time",
TEST_USER, TEST_PASSWORD);
+
+ private final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+ private final StreamTableEnvironment tEnv =
+ StreamTableEnvironment.create(
+ env,
EnvironmentSettings.newInstance().inStreamingMode().build());
+
+ private final boolean incrementalSnapshot;
+
+ @Parameterized.Parameters(name = "incrementalSnapshot: {0}")
+ public static Object[] parameters() {
+ return new Object[][] {new Object[] {false}, new Object[] {true}};
+ }
+
+ public MySqlAncientDateAndTimeITCase(boolean incrementalSnapshot) {
+ this.incrementalSnapshot = incrementalSnapshot;
+ }
+
+ @BeforeClass
+ public static void beforeClass() {
+ LOG.info("Starting MySql container...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Container MySql is started.");
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ LOG.info("Stopping MySql containers...");
+ MYSQL_CONTAINER.stop();
+ LOG.info("Container MySql is stopped.");
+ }
+
+ @Before
+ public void before() {
+ TestValuesTableFactory.clearAllData();
+ if (incrementalSnapshot) {
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(200);
+ } else {
+ env.setParallelism(1);
+ }
+ ancientDatabase.createAndInitialize();
+ }
+
+ @After
+ public void after() {
+ ancientDatabase.dropDatabase();
+ }
+
+ /**
+ * With the TimeAdjuster in Debezium, all date / time records between year
0001 and 0099 will be
+ * shifted to 1971 ~ 2069.
+ */
+ @Test
+ public void testAncientDateAndTimeWithTimeAdjuster() throws Exception {
+ runGenericAncientDateAndTimeTest(
+ MYSQL_CONTAINER,
+ ancientDatabase,
+ incrementalSnapshot,
+ true,
+ Arrays.asList(
+ "+I[1, 2017-08-12, 2016-07-13T17:17:17,
2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123,
2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450,
2010-01-19T17:17:17.123456]",
+ "+I[2, null, null, null, null, null, null, null,
null]",
+ "+I[3, 2001-01-01, 2001-01-01T16:16:16,
2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123,
2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450,
2001-01-01T16:16:16.123456]",
+ "+I[4, 2002-02-02, 2002-02-02T15:15:15,
2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123,
2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450,
2002-02-02T15:15:15.123456]",
+ "+I[5, 2033-03-03, 2033-03-03T14:14:14,
2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123,
2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450,
2033-03-03T14:14:14.123456]",
+ "+I[6, 0444-04-04, 0444-04-04T13:13:13,
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123,
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450,
0444-04-04T13:13:13.123456]",
+ "+I[7, 1969-12-31, 1969-12-31T12:12:12,
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123,
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450,
1969-12-31T12:12:12.123456]",
+ "+I[8, 2019-12-31, 2019-12-31T23:11:11,
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123,
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450,
2019-12-31T23:11:11.123456]"),
+ Arrays.asList(
+ "+I[9, 2017-08-12, 2016-07-13T17:17:17,
2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123,
2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450,
2010-01-19T17:17:17.123456]",
+ "+I[10, null, null, null, null, null, null, null,
null]",
+ "+I[11, 2001-01-01, 2001-01-01T16:16:16,
2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123,
2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450,
2001-01-01T16:16:16.123456]",
+ "+I[12, 2002-02-02, 2002-02-02T15:15:15,
2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123,
2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450,
2002-02-02T15:15:15.123456]",
+ "+I[13, 2033-03-03, 2033-03-03T14:14:14,
2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123,
2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450,
2033-03-03T14:14:14.123456]",
+ "+I[14, 0444-04-04, 0444-04-04T13:13:13,
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123,
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450,
0444-04-04T13:13:13.123456]",
+ "+I[15, 1969-12-31, 1969-12-31T12:12:12,
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123,
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450,
1969-12-31T12:12:12.123456]",
+ "+I[16, 2019-12-31, 2019-12-31T23:11:11,
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123,
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450,
2019-12-31T23:11:11.123456]"));
+ }
+
+ @Test
+ public void testAncientDateAndTimeWithoutTimeAdjuster() throws Exception {
+ runGenericAncientDateAndTimeTest(
+ MYSQL_CONTAINER,
+ ancientDatabase,
+ incrementalSnapshot,
+ false,
+ Arrays.asList(
+ "+I[1, 0017-08-12, 0016-07-13T17:17:17,
0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123,
0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450,
0010-01-19T17:17:17.123456]",
+ "+I[2, null, null, null, null, null, null, null,
null]",
+ "+I[3, 0001-01-01, 0001-01-01T16:16:16,
0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123,
0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450,
0001-01-01T16:16:16.123456]",
+ "+I[4, 0002-02-02, 0002-02-02T15:15:15,
0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123,
0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450,
0002-02-02T15:15:15.123456]",
+ "+I[5, 0033-03-03, 0033-03-03T14:14:14,
0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123,
0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450,
0033-03-03T14:14:14.123456]",
+ "+I[6, 0444-04-04, 0444-04-04T13:13:13,
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123,
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450,
0444-04-04T13:13:13.123456]",
+ "+I[7, 1969-12-31, 1969-12-31T12:12:12,
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123,
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450,
1969-12-31T12:12:12.123456]",
+ "+I[8, 2019-12-31, 2019-12-31T23:11:11,
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123,
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450,
2019-12-31T23:11:11.123456]"),
+ Arrays.asList(
+ "+I[9, 0017-08-12, 0016-07-13T17:17:17,
0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123,
0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450,
0010-01-19T17:17:17.123456]",
+ "+I[10, null, null, null, null, null, null, null,
null]",
+ "+I[11, 0001-01-01, 0001-01-01T16:16:16,
0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123,
0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450,
0001-01-01T16:16:16.123456]",
+ "+I[12, 0002-02-02, 0002-02-02T15:15:15,
0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123,
0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450,
0002-02-02T15:15:15.123456]",
+ "+I[13, 0033-03-03, 0033-03-03T14:14:14,
0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123,
0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450,
0033-03-03T14:14:14.123456]",
+ "+I[14, 0444-04-04, 0444-04-04T13:13:13,
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123,
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450,
0444-04-04T13:13:13.123456]",
+ "+I[15, 1969-12-31, 1969-12-31T12:12:12,
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123,
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450,
1969-12-31T12:12:12.123456]",
+ "+I[16, 2019-12-31, 2019-12-31T23:11:11,
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123,
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450,
2019-12-31T23:11:11.123456]"));
+ }
+
+ private void runGenericAncientDateAndTimeTest(
+ MySqlContainer container,
+ UniqueDatabase database,
+ boolean incrementalSnapshot,
+ boolean enableTimeAdjuster,
+ List<String> expectedSnapshotResults,
+ List<String> expectedStreamingResults)
+ throws Exception {
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE ancient_db ("
+ + " `id` INT NOT NULL,"
+ + " date_col DATE,"
+ + " datetime_0_col TIMESTAMP(0),"
+ + " datetime_1_col TIMESTAMP(1),"
+ + " datetime_2_col TIMESTAMP(2),"
+ + " datetime_3_col TIMESTAMP(3),"
+ + " datetime_4_col TIMESTAMP(4),"
+ + " datetime_5_col TIMESTAMP(5),"
+ + " datetime_6_col TIMESTAMP(6),"
+ + " primary key (`id`) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'mysql-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.incremental.snapshot.enabled' =
'%s',"
+ + " 'server-time-zone' = 'UTC',"
+ + " 'server-id' = '%s',"
+ + " 'debezium.enable.time.adjuster' = '%s'"
+ + ")",
+ container.getHost(),
+ container.getDatabasePort(),
+ TEST_USER,
+ TEST_PASSWORD,
+ database.getDatabaseName(),
+ "ancient_times",
+ incrementalSnapshot,
+ getServerId(),
+ enableTimeAdjuster);
+
+ tEnv.executeSql(sourceDDL);
+
+ TableResult result = tEnv.executeSql("SELECT * FROM ancient_db");
+ do {
+ Thread.sleep(5000L);
+ } while (result.getJobClient().get().getJobStatus().get() != RUNNING);
+
+ CloseableIterator<Row> iterator = result.collect();
+
+ List<String> expectedRows = new ArrayList<>(expectedSnapshotResults);
+
+ Assertions.assertThat(fetchRows(iterator, expectedRows.size()))
+ .containsExactlyInAnyOrderElementsOf(expectedRows);
+
+ createBinlogEvents(database);
+
+ Assertions.assertThat(fetchRows(iterator,
expectedStreamingResults.size()))
+ .containsExactlyInAnyOrderElementsOf(expectedStreamingResults);
+ result.getJobClient().get().cancel().get();
+ }
+
+ private static void createBinlogEvents(UniqueDatabase database) throws
SQLException {
+ // Test reading identical data in binlog stage again
+ try (Connection connection = database.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '0017-08-12',\n"
+ + " '0016-07-13 17:17:17',\n"
+ + " '0015-06-14 17:17:17.1',\n"
+ + " '0014-05-15 17:17:17.12',\n"
+ + " '0013-04-16 17:17:17.123',\n"
+ + " '0012-03-17 17:17:17.1234',\n"
+ + " '0011-02-18 17:17:17.12345',\n"
+ + " '0010-01-19 17:17:17.123456'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '0000-00-00',\n"
+ + " '0000-00-00 00:00:00',\n"
+ + " '0000-00-00 00:00:00.0',\n"
+ + " '0000-00-00 00:00:00.00',\n"
+ + " '0000-00-00 00:00:00.000',\n"
+ + " '0000-00-00 00:00:00.0000',\n"
+ + " '0000-00-00 00:00:00.00000',\n"
+ + " '0000-00-00 00:00:00.000000'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '0001-01-01',\n"
+ + " '0001-01-01 16:16:16',\n"
+ + " '0001-01-01 16:16:16.1',\n"
+ + " '0001-01-01 16:16:16.12',\n"
+ + " '0001-01-01 16:16:16.123',\n"
+ + " '0001-01-01 16:16:16.1234',\n"
+ + " '0001-01-01 16:16:16.12345',\n"
+ + " '0001-01-01 16:16:16.123456'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '0002-02-02',\n"
+ + " '0002-02-02 15:15:15',\n"
+ + " '0002-02-02 15:15:15.1',\n"
+ + " '0002-02-02 15:15:15.12',\n"
+ + " '0002-02-02 15:15:15.123',\n"
+ + " '0002-02-02 15:15:15.1234',\n"
+ + " '0002-02-02 15:15:15.12345',\n"
+ + " '0002-02-02 15:15:15.123456'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '0033-03-03',\n"
+ + " '0033-03-03 14:14:14',\n"
+ + " '0033-03-03 14:14:14.1',\n"
+ + " '0033-03-03 14:14:14.12',\n"
+ + " '0033-03-03 14:14:14.123',\n"
+ + " '0033-03-03 14:14:14.1234',\n"
+ + " '0033-03-03 14:14:14.12345',\n"
+ + " '0033-03-03 14:14:14.123456'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '0444-04-04',\n"
+ + " '0444-04-04 13:13:13',\n"
+ + " '0444-04-04 13:13:13.1',\n"
+ + " '0444-04-04 13:13:13.12',\n"
+ + " '0444-04-04 13:13:13.123',\n"
+ + " '0444-04-04 13:13:13.1234',\n"
+ + " '0444-04-04 13:13:13.12345',\n"
+ + " '0444-04-04 13:13:13.123456'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '1969-12-31',\n"
+ + " '1969-12-31 12:12:12',\n"
+ + " '1969-12-31 12:12:12.1',\n"
+ + " '1969-12-31 12:12:12.12',\n"
+ + " '1969-12-31 12:12:12.123',\n"
+ + " '1969-12-31 12:12:12.1234',\n"
+ + " '1969-12-31 12:12:12.12345',\n"
+ + " '1969-12-31 12:12:12.123456'\n"
+ + ");");
+ statement.execute(
+ "INSERT INTO ancient_times VALUES (\n"
+ + " DEFAULT,\n"
+ + " '2019-12-31',\n"
+ + " '2019-12-31 23:11:11',\n"
+ + " '2019-12-31 23:11:11.1',\n"
+ + " '2019-12-31 23:11:11.12',\n"
+ + " '2019-12-31 23:11:11.123',\n"
+ + " '2019-12-31 23:11:11.1234',\n"
+ + " '2019-12-31 23:11:11.12345',\n"
+ + " '2019-12-31 23:11:11.123456'\n"
+ + ");");
+ }
+ }
+
+ private static List<String> fetchRows(Iterator<Row> iter, int size) {
+ List<String> rows = new ArrayList<>(size);
+ while (size > 0 && iter.hasNext()) {
+ Row row = iter.next();
+ rows.add(row.toString());
+ size--;
+ }
+ return rows;
+ }
+
+ private String getServerId() {
+ final Random random = new Random();
+ int serverId = random.nextInt(100) + 5400;
+ if (incrementalSnapshot) {
+ return serverId + "-" + (serverId + env.getParallelism());
+ }
+ return String.valueOf(serverId);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql
new file mode 100644
index 000000000..e4ff4e595
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql
@@ -0,0 +1,124 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE ancient_times
+(
+ id SERIAL,
+ date_col DATE,
+ datetime_0_col DATETIME(0),
+ datetime_1_col DATETIME(1),
+ datetime_2_col DATETIME(2),
+ datetime_3_col DATETIME(3),
+ datetime_4_col DATETIME(4),
+ datetime_5_col DATETIME(5),
+ datetime_6_col DATETIME(6),
+ PRIMARY KEY (id)
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '0017-08-12',
+ '0016-07-13 17:17:17',
+ '0015-06-14 17:17:17.1',
+ '0014-05-15 17:17:17.12',
+ '0013-04-16 17:17:17.123',
+ '0012-03-17 17:17:17.1234',
+ '0011-02-18 17:17:17.12345',
+ '0010-01-19 17:17:17.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '0000-00-00',
+ '0000-00-00 00:00:00',
+ '0000-00-00 00:00:00.0',
+ '0000-00-00 00:00:00.00',
+ '0000-00-00 00:00:00.000',
+ '0000-00-00 00:00:00.0000',
+ '0000-00-00 00:00:00.00000',
+ '0000-00-00 00:00:00.000000'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '0001-01-01',
+ '0001-01-01 16:16:16',
+ '0001-01-01 16:16:16.1',
+ '0001-01-01 16:16:16.12',
+ '0001-01-01 16:16:16.123',
+ '0001-01-01 16:16:16.1234',
+ '0001-01-01 16:16:16.12345',
+ '0001-01-01 16:16:16.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '0002-02-02',
+ '0002-02-02 15:15:15',
+ '0002-02-02 15:15:15.1',
+ '0002-02-02 15:15:15.12',
+ '0002-02-02 15:15:15.123',
+ '0002-02-02 15:15:15.1234',
+ '0002-02-02 15:15:15.12345',
+ '0002-02-02 15:15:15.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '0033-03-03',
+ '0033-03-03 14:14:14',
+ '0033-03-03 14:14:14.1',
+ '0033-03-03 14:14:14.12',
+ '0033-03-03 14:14:14.123',
+ '0033-03-03 14:14:14.1234',
+ '0033-03-03 14:14:14.12345',
+ '0033-03-03 14:14:14.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '0444-04-04',
+ '0444-04-04 13:13:13',
+ '0444-04-04 13:13:13.1',
+ '0444-04-04 13:13:13.12',
+ '0444-04-04 13:13:13.123',
+ '0444-04-04 13:13:13.1234',
+ '0444-04-04 13:13:13.12345',
+ '0444-04-04 13:13:13.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '1969-12-31',
+ '1969-12-31 12:12:12',
+ '1969-12-31 12:12:12.1',
+ '1969-12-31 12:12:12.12',
+ '1969-12-31 12:12:12.123',
+ '1969-12-31 12:12:12.1234',
+ '1969-12-31 12:12:12.12345',
+ '1969-12-31 12:12:12.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+ DEFAULT,
+ '2019-12-31',
+ '2019-12-31 23:11:11',
+ '2019-12-31 23:11:11.1',
+ '2019-12-31 23:11:11.12',
+ '2019-12-31 23:11:11.123',
+ '2019-12-31 23:11:11.1234',
+ '2019-12-31 23:11:11.12345',
+ '2019-12-31 23:11:11.123456'
+);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/docker/server-allow-ancient-date-time/my.cnf
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/docker/server-allow-ancient-date-time/my.cnf
new file mode 100644
index 000000000..ca0483780
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/docker/server-allow-ancient-date-time/my.cnf
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but
would
+# be longer on a production system. Row-level info is required for ingest to
work.
+# Server ID is required, but this will vary on production systems
+server-id = 223344
+log_bin = mysql-bin
+expire_logs_days = 1
+binlog_format = row
+sql_mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION"