This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 619903c1c [FLINK-39209][doris] Fix time data type serialiazation when
sink to doris with pipeline connector (#4312)
619903c1c is described below
commit 619903c1c06955970a2cbfd9e7ec7765a03f08e6
Author: chengcongchina <[email protected]>
AuthorDate: Wed Mar 18 17:52:41 2026 +0800
[FLINK-39209][doris] Fix time data type serialiazation when sink to doris
with pipeline connector (#4312)
---
.../doris/sink/DorisEventSerializer.java | 7 +++++
.../connectors/doris/sink/DorisRowConverter.java | 13 ++++++++-
.../doris/sink/DorisEventSerializerTest.java | 34 ++++++++++++++++++++++
.../cdc/pipeline/tests/MySqlToDorisE2eITCase.java | 22 ++++++++------
.../src/test/resources/ddl/data_types_test.sql | 4 ++-
5 files changed, 69 insertions(+), 11 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java
index 983a2c916..f74cfebeb 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java
@@ -67,6 +67,13 @@ public class DorisEventSerializer implements
DorisRecordSerializer<Event> {
public static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+ /** Format TIME type data without precision. */
+ public static final DateTimeFormatter TIME_FORMATTER =
DateTimeFormatter.ofPattern("HH:mm:ss");
+
+ /** Format TIME type data with millisecond precision. */
+ public static final DateTimeFormatter TIME_WITH_MILLISECOND_FORMATTER =
+ DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
+
/** ZoneId from pipeline config to support timestamp with local time zone.
*/
public final ZoneId pipelineZoneId;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java
index b1b67a1e2..205c94b62 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java
@@ -117,7 +117,18 @@ public class DorisRowConverter implements Serializable {
final int zonedP = ((ZonedTimestampType) type).getPrecision();
return (index, val) -> val.getTimestamp(index,
zonedP).toTimestamp();
case TIME_WITHOUT_TIME_ZONE:
- return (index, val) -> val.getTime(index).toLocalTime();
+ return (index, val) -> {
+ int precision = DataTypeChecks.getPrecision(type);
+ if (precision == 0) {
+ return val.getTime(index)
+ .toLocalTime()
+ .format(DorisEventSerializer.TIME_FORMATTER);
+ } else {
+ return val.getTime(index)
+ .toLocalTime()
+
.format(DorisEventSerializer.TIME_WITH_MILLISECOND_FORMATTER);
+ }
+ };
case ARRAY:
return (index, val) -> convertArrayData(val.getArray(index),
type);
case MAP:
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java
index e64643226..5bee97995 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.doris.sink;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.TimeData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -41,6 +42,7 @@ import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
@@ -67,6 +69,38 @@ public class DorisEventSerializerTest {
private static final BinaryRecordDataGenerator RECORD_DATA_GENERATOR =
new BinaryRecordDataGenerator(((RowType) SCHEMA.toRowDataType()));
+ @Test
+ public void testDataChangeEventWithTimeDataType() throws IOException {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id_", DataTypes.BIGINT().notNull())
+ .physicalColumn("time_0_", DataTypes.TIME(0))
+ .physicalColumn("time_3_", DataTypes.TIME(3))
+ .primaryKey("id_")
+ .build();
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(((RowType)
schema.toRowDataType()));
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_ID,
schema);
+ DataChangeEvent dataChangeEvent =
+ DataChangeEvent.insertEvent(
+ TABLE_ID,
+ generator.generate(
+ new Object[] {
+ 1L,
+ TimeData.fromLocalTime(LocalTime.of(19,
43, 17)),
+ TimeData.fromLocalTime(LocalTime.of(21,
45, 3, 123000000)),
+ }));
+
+ dorisEventSerializer = new DorisEventSerializer(ZoneId.of("UTC"), new
Configuration());
+ dorisEventSerializer.serialize(createTableEvent);
+ DorisRecord dorisRecord =
dorisEventSerializer.serialize(dataChangeEvent);
+ JsonNode jsonNode = objectMapper.readTree(dorisRecord.getRow());
+
+ Assertions.assertThat(jsonNode.get("id_").asLong()).isEqualTo(1L);
+
Assertions.assertThat(jsonNode.get("time_0_").asText()).isEqualTo("19:43:17");
+
Assertions.assertThat(jsonNode.get("time_3_").asText()).isEqualTo("21:45:03.123");
+ }
+
@Test
public void testDataChangeEventWithDateTimePartitionColumn() throws
IOException {
Map<String, String> configMap = new HashMap<>();
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
index 351e3ccea..25ba4832d 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
@@ -475,6 +475,8 @@ class MySqlToDorisE2eITCase extends PipelineTestEnvironment
{
"datetime3_c | DATETIME(3) | Yes | false | null",
"datetime6_c | DATETIME(6) | Yes | false | null",
"timestamp_c | DATETIME | Yes | false | null",
+ "time_c | TEXT | Yes | false | null",
+ "time3_c | TEXT | Yes | false | null",
"text_c | TEXT | Yes | false | null",
"tiny_blob_c | TEXT | Yes | false | null",
"blob_c | TEXT | Yes | false | null",
@@ -494,9 +496,9 @@ class MySqlToDorisE2eITCase extends PipelineTestEnvironment
{
validateSinkResult(
databaseName,
"DATA_TYPES_TABLE",
- 52,
+ 54,
Collections.singletonList(
- "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607
| 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 |
9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104
| 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 |
34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17
18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 |
red | {\"coordinates\":[1,1],\"ty [...]
+ "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607
| 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 |
9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104
| 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 |
34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17
18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text | EA== | EA==
| EA== | EA== | 2021 | red | { [...]
LOG.info("Verifying streaming stage of DATA_TYPES_TABLE...");
// generate binlogs
@@ -519,7 +521,7 @@ class MySqlToDorisE2eITCase extends PipelineTestEnvironment
{
+ " 4294967295, 4294967295, 2147483647,
9223372036854775807,\n"
+ " 'Hello World', 'abc', 123.102,
123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445,\n"
+ " 123.4567, 123.4568, 123.4569,
345.6, 34567892.1, 0, 1, true,\n"
- + " '2020-07-17', '2020-07-17
18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22',\n"
+ + " '2020-07-17', '2020-07-17
18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', '14:38:07',
'21:49:13.123',\n"
+ " 'text', UNHEX(HEX(16)),
UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), 2021,\n"
+ " 'red',\n"
+ " ST_GeomFromText('POINT(1 1)'),\n"
@@ -535,11 +537,11 @@ class MySqlToDorisE2eITCase extends
PipelineTestEnvironment {
validateSinkResult(
databaseName,
"DATA_TYPES_TABLE",
- 52,
+ 54,
Arrays.asList(
- "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 |
8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 |
2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 |
123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 |
123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 |
2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA==
| 2021 | red | {\"coordinates\":[1,1], [...]
- "2 | 127 | 255 | 255 | 32767 | 65535 | 65535 |
8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 |
2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 |
123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 |
123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 |
2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA==
| 2021 | red | {\"coordinates\":[1,1], [...]
- "5 | 127 | 255 | 255 | 32767 | 65535 | 65535 |
8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 |
2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 |
123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 |
123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 |
2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA==
| 2021 | red | {\"coordinates\":[1,1], [...]
+ "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 |
8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 |
2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 |
123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 |
123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 |
2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text |
EA== | EA== | EA== | EA== | 2021 | red [...]
+ "2 | 127 | 255 | 255 | 32767 | 65535 | 65535 |
8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 |
2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 |
123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 |
123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 |
2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text |
EA== | EA== | EA== | EA== | 2021 | red [...]
+ "5 | 127 | 255 | 255 | 32767 | 65535 | 65535 |
8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 |
2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 |
123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 |
123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 |
2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text |
EA== | EA== | EA== | EA== | 2021 | red [...]
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
@@ -631,6 +633,8 @@ class MySqlToDorisE2eITCase extends PipelineTestEnvironment
{
"datetime3_c | DATETIME(3) | Yes | false | null",
"datetime6_c | DATETIME(6) | Yes | false | null",
"timestamp_c | DATETIME | Yes | false | null",
+ "time_c | TEXT | Yes | false | null",
+ "time3_c | TEXT | Yes | false | null",
"text_c | TEXT | Yes | false | null",
"tiny_blob_c | TEXT | Yes | false | null",
"blob_c | TEXT | Yes | false | null",
@@ -650,9 +654,9 @@ class MySqlToDorisE2eITCase extends PipelineTestEnvironment
{
validateSinkResult(
databaseName,
"DATA_TYPES_TABLE",
- 52,
+ 54,
Collections.singletonList(
- "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607
| 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 |
9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104
| 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 |
34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17
18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 |
red | {\"coordinates\":[1,1],\"ty [...]
+ "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607
| 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 |
9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104
| 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 |
34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17
18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text | EA== | EA==
| EA== | EA== | 2021 | red | { [...]
}
@Test
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql
index 4c7621250..80bd4366a 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql
@@ -55,6 +55,8 @@ CREATE TABLE DATA_TYPES_TABLE
datetime3_c DATETIME(3),
datetime6_c DATETIME(6),
timestamp_c TIMESTAMP,
+ time_c TIME(0),
+ time3_c TIME(3),
text_c TEXT,
tiny_blob_c TINYBLOB,
blob_c BLOB,
@@ -78,7 +80,7 @@ VALUES (1, 127, 255, 255, 32767, 65535, 65535, 8388607,
16777215, 16777215, 2147
4294967295, 4294967295, 2147483647, 9223372036854775807,
'Hello World', 'abc', 123.102, 123.102, 123.103, 123.104, 404.4443,
404.4444, 404.4445,
123.4567, 123.4568, 123.4569, 345.6, 34567892.1, 0, 1, true,
- '2020-07-17', '2020-07-17 18:00:22.123', '2020-07-17
18:00:22.123456', '2020-07-17 18:00:22',
+ '2020-07-17', '2020-07-17 18:00:22.123', '2020-07-17
18:00:22.123456', '2020-07-17 18:00:22', '14:38:07', '21:49:13.123',
'text', UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)),
UNHEX(HEX(16)), 2021,
'red',
ST_GeomFromText('POINT(1 1)'),