This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 25a5817cd [FLINK-38248] The microseconds default value of '0000-00-00 
00:00:00.000000' for MySQL TIMESTAMP fields is not supported in downstream 
systems. (#4251)
25a5817cd is described below

commit 25a5817cd095bfe7329cf9e3f36f232616bec358
Author: Patrick <[email protected]>
AuthorDate: Mon Mar 30 13:39:53 2026 +0800

    [FLINK-38248] The microseconds default value of '0000-00-00 
00:00:00.000000' for MySQL TIMESTAMP fields is not supported in downstream 
systems. (#4251)
---
 .../doris/sink/DorisMetadataApplier.java           |   2 +-
 .../doris/sink/DorisMetadataApplierITCase.java     | 102 +++++++++++++++++++++
 .../paimon/sink/SchemaChangeProvider.java          |   2 +-
 .../paimon/sink/PaimonMetadataApplierTest.java     |  54 +++++++++++
 .../sink/StarRocksMetadataApplierITCase.java       | 101 ++++++++++++++++++++
 5 files changed, 259 insertions(+), 2 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index afb01cc8e..de03dbc53 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -335,7 +335,7 @@ public class DorisMetadataApplier implements 
MetadataApplier {
                 || dataType instanceof TimestampType
                 || dataType instanceof ZonedTimestampType) {
 
-            if 
(DorisSchemaUtils.INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
+            if 
(defaultValue.startsWith(DorisSchemaUtils.INVALID_OR_MISSING_DATATIME)) {
                 return DorisSchemaUtils.DEFAULT_DATETIME;
             }
         }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
index 43384f7e7..b8c48d7a1 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
@@ -630,6 +630,108 @@ class DorisMetadataApplierITCase extends 
DorisSinkTestBase {
         assertEqualsInOrder(expected, actual);
     }
 
+    /** Microsecond variant: '0000-00-00 00:00:00.000000'. */
+    private static final String INVALID_DATETIME_WITH_MICROS = "0000-00-00 
00:00:00.000000";
+
+    @ParameterizedTest(name = "batchMode: {0}")
+    @ValueSource(booleans = {true, false})
+    void testMysqlDefaultTimestampValueWithMicrosInCreateTable(boolean 
batchMode) throws Exception {
+        TableId tableId =
+                TableId.tableId(
+                        DorisContainer.DORIS_DATABASE_NAME, 
DorisContainer.DORIS_TABLE_NAME);
+
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(50), null))
+                        .column(
+                                new PhysicalColumn(
+                                        "created_time",
+                                        DataTypes.TIMESTAMP(6),
+                                        null,
+                                        INVALID_DATETIME_WITH_MICROS))
+                        .column(
+                                new PhysicalColumn(
+                                        "updated_time",
+                                        DataTypes.TIMESTAMP_LTZ(6),
+                                        null,
+                                        INVALID_DATETIME_WITH_MICROS))
+                        .primaryKey("id")
+                        .build();
+
+        runJobWithEvents(
+                Collections.singletonList(new CreateTableEvent(tableId, 
schema)), batchMode);
+
+        List<String> actual = inspectTableSchema(tableId);
+
+        List<String> expected =
+                Arrays.asList(
+                        "id | INT | Yes | true | null",
+                        "name | VARCHAR(150) | Yes | false | null",
+                        "created_time | DATETIME(6) | Yes | false | "
+                                + DorisSchemaUtils.DEFAULT_DATETIME,
+                        "updated_time | DATETIME(6) | Yes | false | "
+                                + DorisSchemaUtils.DEFAULT_DATETIME);
+
+        assertEqualsInOrder(expected, actual);
+    }
+
+    @ParameterizedTest(name = "batchMode: {0}")
+    @ValueSource(booleans = {true, false})
+    void testMysqlDefaultTimestampValueWithMicrosInAddColumn(boolean 
batchMode) throws Exception {
+        TableId tableId =
+                TableId.tableId(
+                        DorisContainer.DORIS_DATABASE_NAME, 
DorisContainer.DORIS_TABLE_NAME);
+
+        Schema initialSchema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(50), null))
+                        .primaryKey("id")
+                        .build();
+
+        List<Event> events = new ArrayList<>();
+        events.add(new CreateTableEvent(tableId, initialSchema));
+
+        PhysicalColumn createdTimeCol =
+                new PhysicalColumn(
+                        "created_time", DataTypes.TIMESTAMP(6), null, 
INVALID_DATETIME_WITH_MICROS);
+
+        PhysicalColumn updatedTimeCol =
+                new PhysicalColumn(
+                        "updated_time",
+                        DataTypes.TIMESTAMP_LTZ(6),
+                        null,
+                        INVALID_DATETIME_WITH_MICROS);
+
+        events.add(
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new 
AddColumnEvent.ColumnWithPosition(createdTimeCol))));
+
+        events.add(
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new 
AddColumnEvent.ColumnWithPosition(updatedTimeCol))));
+
+        runJobWithEvents(events, batchMode);
+
+        List<String> actual = inspectTableSchema(tableId);
+
+        List<String> expected =
+                Arrays.asList(
+                        "id | INT | Yes | true | null",
+                        "name | VARCHAR(150) | Yes | false | null",
+                        "created_time | DATETIME(6) | Yes | false | "
+                                + DorisSchemaUtils.DEFAULT_DATETIME,
+                        "updated_time | DATETIME(6) | Yes | false | "
+                                + DorisSchemaUtils.DEFAULT_DATETIME);
+
+        assertEqualsInOrder(expected, actual);
+    }
+
     private void runJobWithEvents(List<Event> events, boolean batchMode) 
throws Exception {
         DataStream<Event> stream = env.fromData(events, new 
EventTypeInfo()).setParallelism(1);
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
index 22ee7ade1..5e8beacc9 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
@@ -161,7 +161,7 @@ public class SchemaChangeProvider {
                 || dataType instanceof TimestampType
                 || dataType instanceof ZonedTimestampType) {
 
-            if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
+            if (defaultValue.startsWith(INVALID_OR_MISSING_DATATIME)) {
                 return DEFAULT_DATETIME;
             }
         }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 7f0ed436d..9d7e1829a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -669,4 +669,58 @@ class PaimonMetadataApplierTest {
 
         Assertions.assertThat(table).isNotNull();
     }
+
+    /** Microsecond variant: '0000-00-00 00:00:00.000000'. */
+    private static final String INVALID_DATETIME_WITH_MICROS = "0000-00-00 
00:00:00.000000";
+
+    @Test
+    public void testMysqlDefaultTimestampValueWithMicrosInAddColumn()
+            throws SchemaEvolveException,
+                    Catalog.TableNotExistException,
+                    Catalog.DatabaseNotEmptyException,
+                    Catalog.DatabaseNotExistException {
+        initialize("filesystem");
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put("bucket", "-1");
+        MetadataApplier metadataApplier =
+                new PaimonMetadataApplier(catalogOptions, tableOptions, new 
HashMap<>());
+
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(
+                        TableId.parse("test.timestamp_micros_test"),
+                        org.apache.flink.cdc.common.schema.Schema.newBuilder()
+                                .physicalColumn(
+                                        "id",
+                                        
org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+                                .physicalColumn(
+                                        "name",
+                                        
org.apache.flink.cdc.common.types.DataTypes.STRING())
+                                .primaryKey("id")
+                                .build());
+        metadataApplier.applySchemaChange(createTableEvent);
+
+        List<AddColumnEvent.ColumnWithPosition> addedColumns = new 
ArrayList<>();
+        addedColumns.add(
+                AddColumnEvent.last(
+                        Column.physicalColumn(
+                                "created_time",
+                                
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(6),
+                                null,
+                                INVALID_DATETIME_WITH_MICROS)));
+        addedColumns.add(
+                AddColumnEvent.last(
+                        Column.physicalColumn(
+                                "updated_time",
+                                
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(6),
+                                null,
+                                INVALID_DATETIME_WITH_MICROS)));
+
+        AddColumnEvent addColumnEvent =
+                new 
AddColumnEvent(TableId.parse("test.timestamp_micros_test"), addedColumns);
+        metadataApplier.applySchemaChange(addColumnEvent);
+
+        Table table = 
catalog.getTable(Identifier.fromString("test.timestamp_micros_test"));
+
+        Assertions.assertThat(table).isNotNull();
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
index f1e13bc09..fd1d3c068 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -582,4 +582,105 @@ class StarRocksMetadataApplierITCase extends 
StarRocksSinkTestBase {
 
         assertEqualsInOrder(expected, actual);
     }
+
+    /** Microsecond variant: '0000-00-00 00:00:00.000000'. */
+    private static final String INVALID_DATETIME_WITH_MICROS = "0000-00-00 
00:00:00.000000";
+
+    @Test
+    void testMysqlDefaultTimestampValueWithMicrosInCreateTable() throws 
Exception {
+        TableId tableId =
+                TableId.tableId(
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME);
+
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(50), null))
+                        .column(
+                                new PhysicalColumn(
+                                        "created_time",
+                                        DataTypes.TIMESTAMP(6),
+                                        null,
+                                        INVALID_DATETIME_WITH_MICROS))
+                        .column(
+                                new PhysicalColumn(
+                                        "updated_time",
+                                        DataTypes.TIMESTAMP_LTZ(6),
+                                        null,
+                                        INVALID_DATETIME_WITH_MICROS))
+                        .primaryKey("id")
+                        .build();
+
+        runJobWithEvents(Collections.singletonList(new 
CreateTableEvent(tableId, schema)));
+
+        List<String> actual = inspectTableSchema(tableId);
+
+        List<String> expected =
+                Arrays.asList(
+                        "id | int | NO | true | null",
+                        "name | varchar(150) | YES | false | null",
+                        "created_time | datetime | YES | false | "
+                                + StarRocksUtils.DEFAULT_DATETIME,
+                        "updated_time | datetime | YES | false | "
+                                + StarRocksUtils.DEFAULT_DATETIME);
+
+        assertEqualsInOrder(expected, actual);
+    }
+
+    @Test
+    void testMysqlDefaultTimestampValueWithMicrosInAddColumn() throws 
Exception {
+        TableId tableId =
+                TableId.tableId(
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME);
+
+        Schema initialSchema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(50), null))
+                        .primaryKey("id")
+                        .build();
+
+        List<Event> events = new ArrayList<>();
+        events.add(new CreateTableEvent(tableId, initialSchema));
+
+        PhysicalColumn createdTimeCol =
+                new PhysicalColumn(
+                        "created_time", DataTypes.TIMESTAMP(6), null, 
INVALID_DATETIME_WITH_MICROS);
+
+        PhysicalColumn updatedTimeCol =
+                new PhysicalColumn(
+                        "updated_time",
+                        DataTypes.TIMESTAMP_LTZ(6),
+                        null,
+                        INVALID_DATETIME_WITH_MICROS);
+
+        events.add(
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new 
AddColumnEvent.ColumnWithPosition(createdTimeCol))));
+
+        events.add(
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new 
AddColumnEvent.ColumnWithPosition(updatedTimeCol))));
+
+        runJobWithEvents(events);
+
+        List<String> actual = inspectTableSchema(tableId);
+
+        List<String> expected =
+                Arrays.asList(
+                        "id | int | NO | true | null",
+                        "name | varchar(150) | YES | false | null",
+                        "created_time | datetime | YES | false | "
+                                + StarRocksUtils.DEFAULT_DATETIME,
+                        "updated_time | datetime | YES | false | "
+                                + StarRocksUtils.DEFAULT_DATETIME);
+
+        assertEqualsInOrder(expected, actual);
+    }
 }

Reply via email to