This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 619903c1c [FLINK-39209][doris] Fix time data type serialiazation when 
sink to doris with pipeline connector (#4312)
619903c1c is described below

commit 619903c1c06955970a2cbfd9e7ec7765a03f08e6
Author: chengcongchina <[email protected]>
AuthorDate: Wed Mar 18 17:52:41 2026 +0800

    [FLINK-39209][doris] Fix time data type serialiazation when sink to doris 
with pipeline connector (#4312)
---
 .../doris/sink/DorisEventSerializer.java           |  7 +++++
 .../connectors/doris/sink/DorisRowConverter.java   | 13 ++++++++-
 .../doris/sink/DorisEventSerializerTest.java       | 34 ++++++++++++++++++++++
 .../cdc/pipeline/tests/MySqlToDorisE2eITCase.java  | 22 ++++++++------
 .../src/test/resources/ddl/data_types_test.sql     |  4 ++-
 5 files changed, 69 insertions(+), 11 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java
index 983a2c916..f74cfebeb 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java
@@ -67,6 +67,13 @@ public class DorisEventSerializer implements 
DorisRecordSerializer<Event> {
     public static final DateTimeFormatter DATE_TIME_FORMATTER =
             DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
 
+    /** Format TIME type data without precision. */
+    public static final DateTimeFormatter TIME_FORMATTER = 
DateTimeFormatter.ofPattern("HH:mm:ss");
+
+    /** Format TIME type data with millisecond precision. */
+    public static final DateTimeFormatter TIME_WITH_MILLISECOND_FORMATTER =
+            DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
+
     /** ZoneId from pipeline config to support timestamp with local time zone. 
*/
     public final ZoneId pipelineZoneId;
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java
index b1b67a1e2..205c94b62 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java
@@ -117,7 +117,18 @@ public class DorisRowConverter implements Serializable {
                 final int zonedP = ((ZonedTimestampType) type).getPrecision();
                 return (index, val) -> val.getTimestamp(index, 
zonedP).toTimestamp();
             case TIME_WITHOUT_TIME_ZONE:
-                return (index, val) -> val.getTime(index).toLocalTime();
+                return (index, val) -> {
+                    int precision = DataTypeChecks.getPrecision(type);
+                    if (precision == 0) {
+                        return val.getTime(index)
+                                .toLocalTime()
+                                .format(DorisEventSerializer.TIME_FORMATTER);
+                    } else {
+                        return val.getTime(index)
+                                .toLocalTime()
+                                
.format(DorisEventSerializer.TIME_WITH_MILLISECOND_FORMATTER);
+                    }
+                };
             case ARRAY:
                 return (index, val) -> convertArrayData(val.getArray(index), 
type);
             case MAP:
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java
index e64643226..5bee97995 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.doris.sink;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.TimeData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -41,6 +42,7 @@ import java.io.IOException;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.time.ZoneId;
 import java.util.HashMap;
 import java.util.Map;
@@ -67,6 +69,38 @@ public class DorisEventSerializerTest {
     private static final BinaryRecordDataGenerator RECORD_DATA_GENERATOR =
             new BinaryRecordDataGenerator(((RowType) SCHEMA.toRowDataType()));
 
+    @Test
+    public void testDataChangeEventWithTimeDataType() throws IOException {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id_", DataTypes.BIGINT().notNull())
+                        .physicalColumn("time_0_", DataTypes.TIME(0))
+                        .physicalColumn("time_3_", DataTypes.TIME(3))
+                        .primaryKey("id_")
+                        .build();
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(((RowType) 
schema.toRowDataType()));
+        CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_ID, 
schema);
+        DataChangeEvent dataChangeEvent =
+                DataChangeEvent.insertEvent(
+                        TABLE_ID,
+                        generator.generate(
+                                new Object[] {
+                                    1L,
+                                    TimeData.fromLocalTime(LocalTime.of(19, 
43, 17)),
+                                    TimeData.fromLocalTime(LocalTime.of(21, 
45, 3, 123000000)),
+                                }));
+
+        dorisEventSerializer = new DorisEventSerializer(ZoneId.of("UTC"), new 
Configuration());
+        dorisEventSerializer.serialize(createTableEvent);
+        DorisRecord dorisRecord = 
dorisEventSerializer.serialize(dataChangeEvent);
+        JsonNode jsonNode = objectMapper.readTree(dorisRecord.getRow());
+
+        Assertions.assertThat(jsonNode.get("id_").asLong()).isEqualTo(1L);
+        
Assertions.assertThat(jsonNode.get("time_0_").asText()).isEqualTo("19:43:17");
+        
Assertions.assertThat(jsonNode.get("time_3_").asText()).isEqualTo("21:45:03.123");
+    }
+
     @Test
     public void testDataChangeEventWithDateTimePartitionColumn() throws 
IOException {
         Map<String, String> configMap = new HashMap<>();
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
index 351e3ccea..25ba4832d 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
@@ -475,6 +475,8 @@ class MySqlToDorisE2eITCase extends PipelineTestEnvironment 
{
                         "datetime3_c | DATETIME(3) | Yes | false | null",
                         "datetime6_c | DATETIME(6) | Yes | false | null",
                         "timestamp_c | DATETIME | Yes | false | null",
+                        "time_c | TEXT | Yes | false | null",
+                        "time3_c | TEXT | Yes | false | null",
                         "text_c | TEXT | Yes | false | null",
                         "tiny_blob_c | TEXT | Yes | false | null",
                         "blob_c | TEXT | Yes | false | null",
@@ -494,9 +496,9 @@ class MySqlToDorisE2eITCase extends PipelineTestEnvironment 
{
         validateSinkResult(
                 databaseName,
                 "DATA_TYPES_TABLE",
-                52,
+                54,
                 Collections.singletonList(
-                        "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 
| 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 
9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 
| 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 
34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 
18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | 
red | {\"coordinates\":[1,1],\"ty [...]
+                        "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 
| 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 
9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 
| 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 
34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 
18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text | EA== | EA== 
| EA== | EA== | 2021 | red | { [...]
 
         LOG.info("Verifying streaming stage of DATA_TYPES_TABLE...");
         // generate binlogs
@@ -519,7 +521,7 @@ class MySqlToDorisE2eITCase extends PipelineTestEnvironment 
{
                                 + "        4294967295, 4294967295, 2147483647, 
9223372036854775807,\n"
                                 + "        'Hello World', 'abc', 123.102, 
123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445,\n"
                                 + "        123.4567, 123.4568, 123.4569, 
345.6, 34567892.1, 0, 1, true,\n"
-                                + "        '2020-07-17',  '2020-07-17 
18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22',\n"
+                                + "        '2020-07-17',  '2020-07-17 
18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', '14:38:07', 
'21:49:13.123',\n"
                                 + "        'text', UNHEX(HEX(16)), 
UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), 2021,\n"
                                 + "        'red',\n"
                                 + "        ST_GeomFromText('POINT(1 1)'),\n"
@@ -535,11 +537,11 @@ class MySqlToDorisE2eITCase extends 
PipelineTestEnvironment {
             validateSinkResult(
                     databaseName,
                     "DATA_TYPES_TABLE",
-                    52,
+                    54,
                     Arrays.asList(
-                            "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 
8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 
2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 
123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 
123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 
2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== 
| 2021 | red | {\"coordinates\":[1,1], [...]
-                            "2 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 
8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 
2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 
123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 
123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 
2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== 
| 2021 | red | {\"coordinates\":[1,1], [...]
-                            "5 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 
8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 
2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 
123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 
123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 
2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== 
| 2021 | red | {\"coordinates\":[1,1], [...]
+                            "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 
8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 
2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 
123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 
123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 
2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text | 
EA== | EA== | EA== | EA== | 2021 | red [...]
+                            "2 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 
8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 
2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 
123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 
123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 
2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text | 
EA== | EA== | EA== | EA== | 2021 | red [...]
+                            "5 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 
8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 
2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 
123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 
123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 
2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text | 
EA== | EA== | EA== | EA== | 2021 | red [...]
         } catch (SQLException e) {
             LOG.error("Update table for CDC failed.", e);
             throw e;
@@ -631,6 +633,8 @@ class MySqlToDorisE2eITCase extends PipelineTestEnvironment 
{
                         "datetime3_c | DATETIME(3) | Yes | false | null",
                         "datetime6_c | DATETIME(6) | Yes | false | null",
                         "timestamp_c | DATETIME | Yes | false | null",
+                        "time_c | TEXT | Yes | false | null",
+                        "time3_c | TEXT | Yes | false | null",
                         "text_c | TEXT | Yes | false | null",
                         "tiny_blob_c | TEXT | Yes | false | null",
                         "blob_c | TEXT | Yes | false | null",
@@ -650,9 +654,9 @@ class MySqlToDorisE2eITCase extends PipelineTestEnvironment 
{
         validateSinkResult(
                 databaseName,
                 "DATA_TYPES_TABLE",
-                52,
+                54,
                 Collections.singletonList(
-                        "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 
| 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 
9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 
| 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 
34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 
18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | 
red | {\"coordinates\":[1,1],\"ty [...]
+                        "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 
| 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 
9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 
| 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 
34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 
18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text | EA== | EA== 
| EA== | EA== | 2021 | red | { [...]
     }
 
     @Test
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql
index 4c7621250..80bd4366a 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql
@@ -55,6 +55,8 @@ CREATE TABLE DATA_TYPES_TABLE
     datetime3_c          DATETIME(3),
     datetime6_c          DATETIME(6),
     timestamp_c          TIMESTAMP,
+    time_c               TIME(0),
+    time3_c              TIME(3),
     text_c               TEXT,
     tiny_blob_c          TINYBLOB,
     blob_c               BLOB,
@@ -78,7 +80,7 @@ VALUES (1, 127, 255, 255, 32767, 65535, 65535, 8388607, 
16777215, 16777215, 2147
         4294967295, 4294967295, 2147483647, 9223372036854775807,
         'Hello World', 'abc', 123.102, 123.102, 123.103, 123.104, 404.4443, 
404.4444, 404.4445,
         123.4567, 123.4568, 123.4569, 345.6, 34567892.1, 0, 1, true,
-        '2020-07-17',  '2020-07-17 18:00:22.123', '2020-07-17 
18:00:22.123456', '2020-07-17 18:00:22',
+        '2020-07-17',  '2020-07-17 18:00:22.123', '2020-07-17 
18:00:22.123456', '2020-07-17 18:00:22', '14:38:07', '21:49:13.123',
         'text', UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), 
UNHEX(HEX(16)), 2021,
         'red',
         ST_GeomFromText('POINT(1 1)'),

Reply via email to