This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 25a5817cd [FLINK-38248] The microseconds default value of '0000-00-00
00:00:00.000000' for MySQL TIMESTAMP fields is not supported in downstream
systems. (#4251)
25a5817cd is described below
commit 25a5817cd095bfe7329cf9e3f36f232616bec358
Author: Patrick <[email protected]>
AuthorDate: Mon Mar 30 13:39:53 2026 +0800
[FLINK-38248] The microseconds default value of '0000-00-00
00:00:00.000000' for MySQL TIMESTAMP fields is not supported in downstream
systems. (#4251)
---
.../doris/sink/DorisMetadataApplier.java | 2 +-
.../doris/sink/DorisMetadataApplierITCase.java | 102 +++++++++++++++++++++
.../paimon/sink/SchemaChangeProvider.java | 2 +-
.../paimon/sink/PaimonMetadataApplierTest.java | 54 +++++++++++
.../sink/StarRocksMetadataApplierITCase.java | 101 ++++++++++++++++++++
5 files changed, 259 insertions(+), 2 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index afb01cc8e..de03dbc53 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -335,7 +335,7 @@ public class DorisMetadataApplier implements
MetadataApplier {
|| dataType instanceof TimestampType
|| dataType instanceof ZonedTimestampType) {
- if
(DorisSchemaUtils.INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
+ if
(defaultValue.startsWith(DorisSchemaUtils.INVALID_OR_MISSING_DATATIME)) {
return DorisSchemaUtils.DEFAULT_DATETIME;
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
index 43384f7e7..b8c48d7a1 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
@@ -630,6 +630,108 @@ class DorisMetadataApplierITCase extends
DorisSinkTestBase {
assertEqualsInOrder(expected, actual);
}
+ /** Microsecond variant: '0000-00-00 00:00:00.000000'. */
+ private static final String INVALID_DATETIME_WITH_MICROS = "0000-00-00
00:00:00.000000";
+
+ @ParameterizedTest(name = "batchMode: {0}")
+ @ValueSource(booleans = {true, false})
+ void testMysqlDefaultTimestampValueWithMicrosInCreateTable(boolean
batchMode) throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ DorisContainer.DORIS_DATABASE_NAME,
DorisContainer.DORIS_TABLE_NAME);
+
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id",
DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("name",
DataTypes.VARCHAR(50), null))
+ .column(
+ new PhysicalColumn(
+ "created_time",
+ DataTypes.TIMESTAMP(6),
+ null,
+ INVALID_DATETIME_WITH_MICROS))
+ .column(
+ new PhysicalColumn(
+ "updated_time",
+ DataTypes.TIMESTAMP_LTZ(6),
+ null,
+ INVALID_DATETIME_WITH_MICROS))
+ .primaryKey("id")
+ .build();
+
+ runJobWithEvents(
+ Collections.singletonList(new CreateTableEvent(tableId,
schema)), batchMode);
+
+ List<String> actual = inspectTableSchema(tableId);
+
+ List<String> expected =
+ Arrays.asList(
+ "id | INT | Yes | true | null",
+ "name | VARCHAR(150) | Yes | false | null",
+ "created_time | DATETIME(6) | Yes | false | "
+ + DorisSchemaUtils.DEFAULT_DATETIME,
+ "updated_time | DATETIME(6) | Yes | false | "
+ + DorisSchemaUtils.DEFAULT_DATETIME);
+
+ assertEqualsInOrder(expected, actual);
+ }
+
+ @ParameterizedTest(name = "batchMode: {0}")
+ @ValueSource(booleans = {true, false})
+ void testMysqlDefaultTimestampValueWithMicrosInAddColumn(boolean
batchMode) throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ DorisContainer.DORIS_DATABASE_NAME,
DorisContainer.DORIS_TABLE_NAME);
+
+ Schema initialSchema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id",
DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("name",
DataTypes.VARCHAR(50), null))
+ .primaryKey("id")
+ .build();
+
+ List<Event> events = new ArrayList<>();
+ events.add(new CreateTableEvent(tableId, initialSchema));
+
+ PhysicalColumn createdTimeCol =
+ new PhysicalColumn(
+ "created_time", DataTypes.TIMESTAMP(6), null,
INVALID_DATETIME_WITH_MICROS);
+
+ PhysicalColumn updatedTimeCol =
+ new PhysicalColumn(
+ "updated_time",
+ DataTypes.TIMESTAMP_LTZ(6),
+ null,
+ INVALID_DATETIME_WITH_MICROS);
+
+ events.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new
AddColumnEvent.ColumnWithPosition(createdTimeCol))));
+
+ events.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new
AddColumnEvent.ColumnWithPosition(updatedTimeCol))));
+
+ runJobWithEvents(events, batchMode);
+
+ List<String> actual = inspectTableSchema(tableId);
+
+ List<String> expected =
+ Arrays.asList(
+ "id | INT | Yes | true | null",
+ "name | VARCHAR(150) | Yes | false | null",
+ "created_time | DATETIME(6) | Yes | false | "
+ + DorisSchemaUtils.DEFAULT_DATETIME,
+ "updated_time | DATETIME(6) | Yes | false | "
+ + DorisSchemaUtils.DEFAULT_DATETIME);
+
+ assertEqualsInOrder(expected, actual);
+ }
+
private void runJobWithEvents(List<Event> events, boolean batchMode)
throws Exception {
DataStream<Event> stream = env.fromData(events, new
EventTypeInfo()).setParallelism(1);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
index 22ee7ade1..5e8beacc9 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
@@ -161,7 +161,7 @@ public class SchemaChangeProvider {
|| dataType instanceof TimestampType
|| dataType instanceof ZonedTimestampType) {
- if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
+ if (defaultValue.startsWith(INVALID_OR_MISSING_DATATIME)) {
return DEFAULT_DATETIME;
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 7f0ed436d..9d7e1829a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -669,4 +669,58 @@ class PaimonMetadataApplierTest {
Assertions.assertThat(table).isNotNull();
}
+
+ /** Microsecond variant: '0000-00-00 00:00:00.000000'. */
+ private static final String INVALID_DATETIME_WITH_MICROS = "0000-00-00
00:00:00.000000";
+
+ @Test
+ public void testMysqlDefaultTimestampValueWithMicrosInAddColumn()
+ throws SchemaEvolveException,
+ Catalog.TableNotExistException,
+ Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException {
+ initialize("filesystem");
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put("bucket", "-1");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new
HashMap<>());
+
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.timestamp_micros_test"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "id",
+
org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+ .physicalColumn(
+ "name",
+
org.apache.flink.cdc.common.types.DataTypes.STRING())
+ .primaryKey("id")
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ List<AddColumnEvent.ColumnWithPosition> addedColumns = new
ArrayList<>();
+ addedColumns.add(
+ AddColumnEvent.last(
+ Column.physicalColumn(
+ "created_time",
+
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(6),
+ null,
+ INVALID_DATETIME_WITH_MICROS)));
+ addedColumns.add(
+ AddColumnEvent.last(
+ Column.physicalColumn(
+ "updated_time",
+
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(6),
+ null,
+ INVALID_DATETIME_WITH_MICROS)));
+
+ AddColumnEvent addColumnEvent =
+ new
AddColumnEvent(TableId.parse("test.timestamp_micros_test"), addedColumns);
+ metadataApplier.applySchemaChange(addColumnEvent);
+
+ Table table =
catalog.getTable(Identifier.fromString("test.timestamp_micros_test"));
+
+ Assertions.assertThat(table).isNotNull();
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
index f1e13bc09..fd1d3c068 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -582,4 +582,105 @@ class StarRocksMetadataApplierITCase extends
StarRocksSinkTestBase {
assertEqualsInOrder(expected, actual);
}
+
+ /** Microsecond variant: '0000-00-00 00:00:00.000000'. */
+ private static final String INVALID_DATETIME_WITH_MICROS = "0000-00-00
00:00:00.000000";
+
+ @Test
+ void testMysqlDefaultTimestampValueWithMicrosInCreateTable() throws
Exception {
+ TableId tableId =
+ TableId.tableId(
+ StarRocksContainer.STARROCKS_DATABASE_NAME,
+ StarRocksContainer.STARROCKS_TABLE_NAME);
+
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id",
DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("name",
DataTypes.VARCHAR(50), null))
+ .column(
+ new PhysicalColumn(
+ "created_time",
+ DataTypes.TIMESTAMP(6),
+ null,
+ INVALID_DATETIME_WITH_MICROS))
+ .column(
+ new PhysicalColumn(
+ "updated_time",
+ DataTypes.TIMESTAMP_LTZ(6),
+ null,
+ INVALID_DATETIME_WITH_MICROS))
+ .primaryKey("id")
+ .build();
+
+ runJobWithEvents(Collections.singletonList(new
CreateTableEvent(tableId, schema)));
+
+ List<String> actual = inspectTableSchema(tableId);
+
+ List<String> expected =
+ Arrays.asList(
+ "id | int | NO | true | null",
+ "name | varchar(150) | YES | false | null",
+ "created_time | datetime | YES | false | "
+ + StarRocksUtils.DEFAULT_DATETIME,
+ "updated_time | datetime | YES | false | "
+ + StarRocksUtils.DEFAULT_DATETIME);
+
+ assertEqualsInOrder(expected, actual);
+ }
+
+ @Test
+ void testMysqlDefaultTimestampValueWithMicrosInAddColumn() throws
Exception {
+ TableId tableId =
+ TableId.tableId(
+ StarRocksContainer.STARROCKS_DATABASE_NAME,
+ StarRocksContainer.STARROCKS_TABLE_NAME);
+
+ Schema initialSchema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id",
DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("name",
DataTypes.VARCHAR(50), null))
+ .primaryKey("id")
+ .build();
+
+ List<Event> events = new ArrayList<>();
+ events.add(new CreateTableEvent(tableId, initialSchema));
+
+ PhysicalColumn createdTimeCol =
+ new PhysicalColumn(
+ "created_time", DataTypes.TIMESTAMP(6), null,
INVALID_DATETIME_WITH_MICROS);
+
+ PhysicalColumn updatedTimeCol =
+ new PhysicalColumn(
+ "updated_time",
+ DataTypes.TIMESTAMP_LTZ(6),
+ null,
+ INVALID_DATETIME_WITH_MICROS);
+
+ events.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new
AddColumnEvent.ColumnWithPosition(createdTimeCol))));
+
+ events.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new
AddColumnEvent.ColumnWithPosition(updatedTimeCol))));
+
+ runJobWithEvents(events);
+
+ List<String> actual = inspectTableSchema(tableId);
+
+ List<String> expected =
+ Arrays.asList(
+ "id | int | NO | true | null",
+ "name | varchar(150) | YES | false | null",
+ "created_time | datetime | YES | false | "
+ + StarRocksUtils.DEFAULT_DATETIME,
+ "updated_time | datetime | YES | false | "
+ + StarRocksUtils.DEFAULT_DATETIME);
+
+ assertEqualsInOrder(expected, actual);
+ }
}