This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0a1a72ec2ac [fix](streaming-job) cdc client MySQL TIME range fix with
type-consistency and GTID guards (#64741)
0a1a72ec2ac is described below
commit 0a1a72ec2acc8e8e70f0425bad4fc0a9ed8637c5
Author: wudi <[email protected]>
AuthorDate: Thu Jun 25 19:14:36 2026 +0800
[fix](streaming-job) cdc client MySQL TIME range fix with type-consistency
and GTID guards (#64741)
## Proposed changes
- `DebeziumJsonDeserializer.convertToTime`: negative and `>=24h` MySQL
TIME values now format as `±HH:MM:SS[.ffffff]` instead of falling back
to the raw long literal. Only the MySQL TIME path changes; in-range
values and PostgreSQL `time` are unaffected.
- Adds snapshot-vs-binlog type-consistency ITCases (MySQL + PostgreSQL):
assert the snapshot (JDBC) and binlog (decoding) paths deserialize every
column identically; JSON is compared by parsed value to tolerate
whitespace/key-order.
- Adds a GTID multi-interval guard unit test: pins that a GTID set with
multiple disjoint intervals per server uuid survives parsing /
serialization / offset-map round-trip without the gap being merged away
---
.../deserialize/DebeziumJsonDeserializer.java | 50 ++++-
.../source/reader/mysql/MySqlSourceReader.java | 25 +--
.../itcase/MySqlStartupGtidOffsetITCase.java | 165 ++++++++++++++++
.../cdcclient/itcase/MySqlTimeRangeITCase.java | 149 ++++++++++++++
.../itcase/MySqlTypeConsistencyITCase.java | 214 +++++++++++++++++++++
.../cdcclient/itcase/PostgresTimeRangeITCase.java | 126 ++++++++++++
.../itcase/PostgresTypeConsistencyITCase.java | 206 ++++++++++++++++++++
.../deserialize/DebeziumJsonDeserializerTest.java | 104 ++++++++++
.../reader/mysql/GtidMultiIntervalOffsetTest.java | 95 +++++++++
.../source/reader/mysql/MySqlSourceReaderTest.java | 55 ++++++
10 files changed, 1173 insertions(+), 16 deletions(-)
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 8487975f7d1..36edf1c5b2d 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -34,15 +34,18 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.DateTimeException;
+import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -412,19 +415,56 @@ public class DebeziumJsonDeserializer
return dbzObj.toString();
}
+ // Format a since-midnight time value given as total microseconds (may be
negative or exceed
+ // 24h) as MySQL TIME literal text: ±HH:MM:SS[.ffffff], with trailing
fractional zeros removed.
+ private static String formatTimeText(long microsTotal) {
+ String sign = microsTotal < 0 ? "-" : "";
+ Duration d = Duration.of(Math.abs(microsTotal), ChronoUnit.MICROS);
+ StringBuilder sb = new StringBuilder(sign);
+ // toHours may exceed two digits (e.g. 838); minute/second parts are
always < 60.
+ sb.append(
+ String.format(
+ Locale.ROOT,
+ "%02d:%02d:%02d",
+ d.toHours(),
+ d.toMinutesPart(),
+ d.toSecondsPart()));
+ long micros = d.toNanosPart() / 1000L;
+ if (micros > 0) {
+ // six-digit zero-padded micros, then drop trailing zeros (e.g.
500000 -> "5").
+ sb.append('.')
+ .append(StringUtils.stripEnd(String.format(Locale.ROOT,
"%06d", micros), "0"));
+ }
+ return sb.toString();
+ }
+
protected Object convertToTime(Object dbzObj, Schema schema) {
try {
if (dbzObj instanceof Long) {
+ long v = (Long) dbzObj;
switch (schema.name()) {
case MicroTime.SCHEMA_NAME:
- // micro to nano
- return LocalTime.ofNanoOfDay((Long) dbzObj *
1000L).toString();
+ // MySQL TIME spans [-838:59:59, 838:59:59];
out-of-range (negative or
+ // >=24h) cannot use LocalTime, format as ±HH:MM:SS
instead. micro to nano.
+ if (v >= 0 && v < 86_400_000_000L) {
+ return LocalTime.ofNanoOfDay(v * 1000L).toString();
+ }
+ return formatTimeText(v);
case NanoTime.SCHEMA_NAME:
- return LocalTime.ofNanoOfDay((Long) dbzObj).toString();
+ if (v >= 0 && v < 86_400_000_000_000L) {
+ return LocalTime.ofNanoOfDay(v).toString();
+ }
+ // out-of-range: nano to micro. MySQL/PG TIME carries
at most microsecond
+ // precision, so dropping the sub-micro nanos here is
lossless for them.
+ return formatTimeText(v / 1000L);
}
} else if (dbzObj instanceof Integer) {
- // millis to nano
- return LocalTime.ofNanoOfDay((Integer) dbzObj *
1_000_000L).toString();
+ // millis to nano; out-of-range formats as ±HH:MM:SS.
+ int v = (Integer) dbzObj;
+ if (v >= 0 && v < 86_400_000) {
+ return LocalTime.ofNanoOfDay((long) v *
1_000_000L).toString();
+ }
+ return formatTimeText((long) v * 1000L);
} else if (dbzObj instanceof java.util.Date) {
long millisOfDay = ((Date) dbzObj).getTime() % (24 * 60 * 60 *
1000);
// mills to nano
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index efea979a62c..6891f06ba42 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -957,20 +957,23 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
if (MapUtils.isEmpty(offsetMap)) {
throw new RuntimeException("Incorrect offset " + startupMode);
}
- if (offsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY)
- &&
offsetMap.containsKey(BinlogOffset.BINLOG_POSITION_OFFSET_KEY)) {
- BinlogOffset binlogOffset =
- BinlogOffset.builder()
- .setBinlogFilePosition(
-
offsetMap.get(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY),
- Long.parseLong(
- offsetMap.get(
-
BinlogOffset.BINLOG_POSITION_OFFSET_KEY)))
- .build();
-
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
+ boolean hasFilePosition =
+
offsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY)
+ &&
offsetMap.containsKey(BinlogOffset.BINLOG_POSITION_OFFSET_KEY);
+ boolean hasGtids =
offsetMap.containsKey(BinlogOffset.GTID_SET_KEY);
+ BinlogOffset binlogOffset;
+ if (hasFilePosition) {
+ // Keep the full map so gtids and other fields survive;
supplement kind.
+ offsetMap.putIfAbsent(
+ BinlogOffset.OFFSET_KIND_KEY,
BinlogOffsetKind.SPECIFIC.name());
+ binlogOffset = new BinlogOffset(offsetMap);
+ } else if (hasGtids) {
+ // ofGtidSet seeds placeholder file/pos and kind, like Flink
CDC.
+ binlogOffset =
BinlogOffset.ofGtidSet(offsetMap.get(BinlogOffset.GTID_SET_KEY));
} else {
throw new RuntimeException("Incorrect offset " + startupMode);
}
+
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
} else if (is13Timestamp(startupMode)) {
// start from timestamp
Long ts = Long.parseLong(startupMode);
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlStartupGtidOffsetITCase.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlStartupGtidOffsetITCase.java
new file mode 100644
index 00000000000..cdadfb2561f
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlStartupGtidOffsetITCase.java
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * GTID counterpart of {@link MySqlStartupSpecificOffsetITCase}: given a
recorded GTID set as the
+ * specific-offset (no binlog file/position), the job must replay from exactly
that point forward —
+ * proving the GTID-only resume offset is honored end-to-end, not just
preserved in config.
+ */
+@Testcontainers
+class MySqlStartupGtidOffsetITCase {
+
+ private static final String ROOT_USER = "root";
+ private static final String ROOT_PASSWORD = "123456";
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final AtomicLong JOB_ID_SEQ = new AtomicLong(980_000);
+
+ @Container
+ static final MySQLContainer<?> MYSQL =
+ new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ .withCommand("--gtid-mode=ON",
"--enforce-gtid-consistency=ON")
+ .withDatabaseName("cdc_test")
+ .withUsername("cdc")
+ .withPassword("123456")
+ .withEnv("MYSQL_ROOT_PASSWORD", ROOT_PASSWORD);
+
+ private String jobId;
+ private String database;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+ database = "gtidoff_db_" + jobId;
+ try (Connection conn = rootConnection("");
+ Statement st = conn.createStatement()) {
+ st.execute("CREATE DATABASE " + database);
+ st.execute("USE " + database);
+ st.execute("CREATE TABLE t_user (id INT PRIMARY KEY, name
VARCHAR(50))");
+ st.execute("INSERT INTO t_user VALUES (1,'alice'), (2,'bob')");
+ }
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ Env.getCurrentEnv().close(jobId);
+ try (Connection conn = rootConnection("");
+ Statement st = conn.createStatement()) {
+ st.execute("DROP DATABASE IF EXISTS " + database);
+ }
+ }
+
+ @Test
+ void specificGtidOffsetReplaysFromRecordedPositionForward() throws
Exception {
+ // Record the GTID set right after the pre-existing rows; resume must
start past it.
+ String offset = String.format("{\"gtids\":\"%s\"}",
currentGtidExecuted());
+
+ // This change happens after the recorded GTID but before the job
starts — it must be read.
+ insert("INSERT INTO t_user VALUES (3,'carol')");
+
+ try (MockDorisServer mock = new MockDorisServer();
+ CdcClientWriteHarness harness =
+ CdcClientWriteHarness.mysql(
+ jobId,
+ MYSQL.getHost(),
+ MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT),
+ ROOT_USER,
+ ROOT_PASSWORD,
+ database,
+ "t_user",
+ offset,
+ "doris_target_db",
+ mock)) {
+
+ // Resume positions by GTID, so row 3 (already in the binlog) is
read straight away.
+ List<Integer> first = ids(harness.readBinlogFromStartupMode(1,
Duration.ofSeconds(90)));
+ assertThat(first).containsExactly(3);
+
+ insert("INSERT INTO t_user VALUES (4,'dave')");
+ List<Integer> second = ids(harness.continueBinlog(1,
Duration.ofSeconds(90)));
+ assertThat(second).containsExactly(4);
+
+ // Replays from the recorded GTID forward: rows 3 and 4 only,
never 1 or 2.
+ List<Integer> all = ids(harness.loadedRecords());
+ assertThat(all).doesNotContain(1, 2);
+ assertThat(all).containsExactlyInAnyOrder(3, 4);
+ }
+ }
+
+ private String currentGtidExecuted() throws Exception {
+ try (Connection conn = rootConnection(database);
+ Statement st = conn.createStatement();
+ ResultSet rs = st.executeQuery("SELECT @@gtid_executed")) {
+ if (!rs.next()) {
+ throw new IllegalStateException("@@gtid_executed returned no
row");
+ }
+ // Single server uuid fits on one line; strip newlines defensively.
+ return rs.getString(1).replace("\n", "").trim();
+ }
+ }
+
+ private List<Integer> ids(List<String> records) throws Exception {
+ List<Integer> result = new ArrayList<>();
+ for (String record : records) {
+ JsonNode node = MAPPER.readTree(record);
+ result.add(node.get("id").asInt());
+ }
+ return result;
+ }
+
+ private void insert(String sql) throws Exception {
+ try (Connection conn = rootConnection(database);
+ Statement st = conn.createStatement()) {
+ st.execute(sql);
+ }
+ }
+
+ private Connection rootConnection(String db) throws Exception {
+ String url =
+ "jdbc:mysql://"
+ + MYSQL.getHost()
+ + ":"
+ + MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT)
+ + "/"
+ + db;
+ return DriverManager.getConnection(url, ROOT_USER, ROOT_PASSWORD);
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlTimeRangeITCase.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlTimeRangeITCase.java
new file mode 100644
index 00000000000..f0d960ca410
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlTimeRangeITCase.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.cdcclient.itcase.CdcClientReadHarness.SnapshotResult;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * End-to-end coverage of MySQL TIME full range [-838:59:59, 838:59:59] across
the snapshot and
+ * binlog phases. Out-of-range values (negative or >=24h) must be emitted as
±HH:MM:SS[.ffffff]
+ * text rather than the raw long literal they fell back to before the
convertToTime fix.
+ */
+@Testcontainers
+class MySqlTimeRangeITCase {
+
+ private static final String ROOT_USER = "root";
+ private static final String ROOT_PASSWORD = "123456";
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final AtomicLong JOB_ID_SEQ = new AtomicLong(810_000);
+
+ @Container
+ static final MySQLContainer<?> MYSQL =
+ new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ .withDatabaseName("cdc_test")
+ .withUsername("cdc")
+ .withPassword("123456")
+ .withEnv("MYSQL_ROOT_PASSWORD", ROOT_PASSWORD);
+
+ private String jobId;
+ private String database;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+ database = "time_db_" + jobId;
+ try (Connection conn = rootConnection("");
+ Statement st = conn.createStatement()) {
+ st.execute("CREATE DATABASE " + database);
+ st.execute("USE " + database);
+ st.execute(
+ "CREATE TABLE t_time (id INT NOT NULL, t_col TIME(6),
PRIMARY KEY (id))");
+ // snapshot rows: in-range, negative lower bound, positive upper
bound.
+ // MySQL TIME bounds are exactly +/-838:59:59 (no fractional part
allowed at the bound).
+ st.execute(
+ "INSERT INTO t_time VALUES "
+ + "(1,'12:34:56.123456'), (2,'-838:59:59'),
(3,'838:59:59')");
+ }
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ Env.getCurrentEnv().close(jobId);
+ try (Connection conn = rootConnection("");
+ Statement st = conn.createStatement()) {
+ st.execute("DROP DATABASE IF EXISTS " + database);
+ }
+ }
+
+ @Test
+ void timeFullRangeInBothPhases() throws Exception {
+ try (CdcClientReadHarness harness =
+ CdcClientReadHarness.mysql(
+ jobId,
+ MYSQL.getHost(),
+ MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT),
+ ROOT_USER,
+ ROOT_PASSWORD,
+ database,
+ "t_time",
+ "initial")) {
+
+ List<SnapshotSplit> splits =
harness.fetchAllSnapshotSplits("t_time");
+ SnapshotResult snapshot = harness.readSnapshot(splits);
+ Map<Integer, JsonNode> snap = indexById(snapshot.records());
+
+ // snapshot phase: in-range keeps LocalTime text, out-of-range
formats as ±HH:MM:SS
+
assertThat(snap.get(1).get("t_col").asText()).isEqualTo("12:34:56.123456");
+
assertThat(snap.get(2).get("t_col").asText()).isEqualTo("-838:59:59");
+
assertThat(snap.get(3).get("t_col").asText()).isEqualTo("838:59:59");
+
+ // binlog phase: negative >24h, and midnight (in-range) stay
correct
+ try (Connection conn = rootConnection(database);
+ Statement st = conn.createStatement()) {
+ st.execute("INSERT INTO t_time VALUES
(101,'-100:00:00.500000'), (102,'00:00:00')");
+ }
+ List<String> binlog = harness.readBinlogUntil(snapshot, splits, 2,
Duration.ofSeconds(60));
+ Map<Integer, JsonNode> bin = indexById(binlog);
+
assertThat(bin.get(101).get("t_col").asText()).isEqualTo("-100:00:00.5");
+ assertThat(bin.get(102).get("t_col").asText()).isEqualTo("00:00");
+ }
+ }
+
+ private Map<Integer, JsonNode> indexById(List<String> records) throws
Exception {
+ Map<Integer, JsonNode> result = new HashMap<>();
+ for (String record : records) {
+ JsonNode node = MAPPER.readTree(record);
+ result.put(node.get("id").asInt(), node);
+ }
+ return result;
+ }
+
+ private Connection rootConnection(String db) throws Exception {
+ String url =
+ "jdbc:mysql://"
+ + MYSQL.getHost()
+ + ":"
+ + MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT)
+ + "/"
+ + db;
+ return DriverManager.getConnection(url, ROOT_USER, ROOT_PASSWORD);
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlTypeConsistencyITCase.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlTypeConsistencyITCase.java
new file mode 100644
index 00000000000..3279d5f5162
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlTypeConsistencyITCase.java
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.cdcclient.itcase.CdcClientReadHarness.SnapshotResult;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Guards that the snapshot phase (JDBC) and the binlog phase (Debezium
decoding) — two different
+ * converter paths — deserialize the same MySQL value identically. The
identical value is inserted
+ * once before capture (snapshot, id 1) and once after (binlog, id 2); every
column must match
+ * across phases. Any difference is a real bug (the MySQL TIME out-of-range
issue was one instance).
+ *
+ * <p>JSON is compared by parsed value, not text: MySQL returns snapshot JSON
with key reordering and
+ * spaces ({@code {"a": [2, 3], "k": 1}}) while the binlog path emits compact
JSON
+ * ({@code {"a":[2,3],"k":1}}) — same value, different text, so a semantic
comparison is used.
+ */
+@Testcontainers
+class MySqlTypeConsistencyITCase {
+
+ private static final String ROOT_USER = "root";
+ private static final String ROOT_PASSWORD = "123456";
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final AtomicLong JOB_ID_SEQ = new AtomicLong(821_000);
+
+ // Identical column tuple used for both the snapshot row (id 1) and the
binlog row (id 2).
+ private static final String VALUES =
+ "200," // c_tinyint_u TINYINT UNSIGNED
+ + "4000000000," // c_int_u INT UNSIGNED
+ + "18446744073709551615," // c_bigint_u BIGINT UNSIGNED
+ + "12345678901234.567890," // c_decimal DECIMAL(20,6)
+ + "1.5," // c_float FLOAT
+ + "3.141592653589793," // c_double DOUBLE
+ + "1," // c_bool TINYINT(1)
+ + "'b'," // c_enum
+ + "'x,z'," // c_set
+ + "b'10100101'," // c_bit BIT(8)
+ + "'{\"k\": 1, \"a\": [2, 3]}'," // c_json
+ + "'2023-08-15 12:34:56.123456'," // c_datetime
DATETIME(6)
+ + "'2023-08-15 12:34:56.123456'," // c_timestamp
TIMESTAMP(6)
+ + "'2023-08-15'," // c_date DATE
+ + "'12:34:56.123456'," // c_time TIME(6)
+ + "2023," // c_year YEAR
+ + "0x0102030405," // c_varbinary VARBINARY(8)
+ + "'abcde'"; // c_char CHAR(5)
+
+ @Container
+ static final MySQLContainer<?> MYSQL =
+ new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+
.withConfigurationOverride("docker/server-allow-ancient-date-time")
+ .withDatabaseName("cdc_test")
+ .withUsername("cdc")
+ .withPassword("123456")
+ .withEnv("MYSQL_ROOT_PASSWORD", ROOT_PASSWORD);
+
+ private String jobId;
+ private String database;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+ database = "typescan_db_" + jobId;
+ try (Connection conn = rootConnection("");
+ Statement st = conn.createStatement()) {
+ st.execute("CREATE DATABASE " + database);
+ st.execute("USE " + database);
+ st.execute(
+ "CREATE TABLE t_scan ("
+ + "id INT NOT NULL,"
+ + "c_tinyint_u TINYINT UNSIGNED,"
+ + "c_int_u INT UNSIGNED,"
+ + "c_bigint_u BIGINT UNSIGNED,"
+ + "c_decimal DECIMAL(20,6),"
+ + "c_float FLOAT,"
+ + "c_double DOUBLE,"
+ + "c_bool TINYINT(1),"
+ + "c_enum ENUM('a','b','c'),"
+ + "c_set SET('x','y','z'),"
+ + "c_bit BIT(8),"
+ + "c_json JSON,"
+ + "c_datetime DATETIME(6),"
+ + "c_timestamp TIMESTAMP(6) NULL,"
+ + "c_date DATE,"
+ + "c_time TIME(6),"
+ + "c_year YEAR,"
+ + "c_varbinary VARBINARY(8),"
+ + "c_char CHAR(5),"
+ + "PRIMARY KEY (id))");
+ st.execute("INSERT INTO t_scan VALUES (1," + VALUES + ")");
+ }
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ Env.getCurrentEnv().close(jobId);
+ try (Connection conn = rootConnection("");
+ Statement st = conn.createStatement()) {
+ st.execute("DROP DATABASE IF EXISTS " + database);
+ }
+ }
+
+ @Test
+ void snapshotAndBinlogDeserializeIdentically() throws Exception {
+ try (CdcClientReadHarness harness =
+ CdcClientReadHarness.mysql(
+ jobId,
+ MYSQL.getHost(),
+ MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT),
+ ROOT_USER,
+ ROOT_PASSWORD,
+ database,
+ "t_scan",
+ "initial")) {
+
+ List<SnapshotSplit> splits =
harness.fetchAllSnapshotSplits("t_scan");
+ SnapshotResult snapshot = harness.readSnapshot(splits);
+ JsonNode snap = MAPPER.readTree(snapshot.records().get(0));
+
+ // identical value inserted after capture starts -> binlog path
+ try (Connection conn = rootConnection(database);
+ Statement st = conn.createStatement()) {
+ st.execute("INSERT INTO t_scan VALUES (2," + VALUES + ")");
+ }
+ List<String> binlog = harness.readBinlogUntil(snapshot, splits, 1,
Duration.ofSeconds(60));
+ JsonNode bin = MAPPER.readTree(binlog.get(0));
+
+ List<String> mismatches = new ArrayList<>();
+ Iterator<String> fields = snap.fieldNames();
+ while (fields.hasNext()) {
+ String col = fields.next();
+ if (col.equals("id") || col.startsWith("__DORIS")) {
+ continue;
+ }
+ JsonNode snapNode = snap.get(col);
+ JsonNode binNode = bin.get(col);
+ if (!columnsMatch(col, snapNode, binNode)) {
+ mismatches.add(col + ": snapshot=[" + snapNode + "]
binlog=[" + binNode + "]");
+ }
+ }
+ mismatches.forEach(m -> System.out.println("[TYPE SCAN][MISMATCH]
" + m));
+ assertThat(mismatches).as("snapshot vs binlog per-column
mismatches").isEmpty();
+ }
+ }
+
+ // Compare the parsed nodes directly so container columns (objects/arrays)
compare by content
+ // rather than collapsing to "" via JsonNode.asText(). The
whitespace/key-order tolerance is
+ // limited to the JSON column: a JSON value carried as a string can differ
in spacing/key order
+ // between the snapshot (JDBC) and binlog paths, so it is compared by
parsed value; every other
+ // column must match exactly so a real representation difference is never
masked.
+ private boolean columnsMatch(String col, JsonNode a, JsonNode b) {
+ if (a == null || b == null) {
+ return false;
+ }
+ if (a.equals(b)) {
+ return true;
+ }
+ if (col.equals("c_json")) {
+ try {
+ return
MAPPER.readTree(a.asText()).equals(MAPPER.readTree(b.asText()));
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ return false;
+ }
+
+ private Connection rootConnection(String db) throws Exception {
+ String url =
+ "jdbc:mysql://"
+ + MYSQL.getHost()
+ + ":"
+ + MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT)
+ + "/"
+ + db;
+ return DriverManager.getConnection(url, ROOT_USER, ROOT_PASSWORD);
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresTimeRangeITCase.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresTimeRangeITCase.java
new file mode 100644
index 00000000000..2b97cfdda4a
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresTimeRangeITCase.java
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Guards that the MySQL TIME out-of-range fix
(DebeziumJsonDeserializer.convertToTime) does not
+ * regress PostgreSQL time handling. PG time values share the same MicroTime
path: in-range values
+ * must stay byte-for-byte on the unchanged LocalTime branch, and the PG-legal
boundary '24:00:00'
+ * is now formatted as text instead of falling back to the raw long.
+ */
+@Testcontainers
+class PostgresTimeRangeITCase {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final AtomicLong JOB_ID_SEQ = new AtomicLong(585_000);
+
+ @Container
+ static final PostgreSQLContainer<?> POSTGRES =
+ new PostgreSQLContainer<>(DockerImageName.parse("postgres:14"))
+ .withCommand("postgres", "-c", "wal_level=logical");
+
+ private String jobId;
+ private String table;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+ // per-run table name so concurrent forks / parallel runs cannot
collide on a shared table.
+ table = "t_time_" + jobId;
+ try (Connection conn = connect();
+ Statement st = conn.createStatement()) {
+ st.execute("DROP TABLE IF EXISTS " + table);
+ st.execute("CREATE TABLE " + table + " (id INT PRIMARY KEY, t_col
TIME(6))");
+ st.execute("ALTER TABLE " + table + " REPLICA IDENTITY FULL");
+ // id 1: ordinary in-range value -- the fix must leave it
byte-for-byte unchanged.
+ // id 2: PG-legal upper boundary 24:00:00 -- a raw-long fallback
before the fix.
+ st.execute("INSERT INTO " + table + " VALUES
(1,'12:34:56.123456'), (2,'24:00:00')");
+ }
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ Env.getCurrentEnv().close(jobId);
+ try (Connection conn = connect();
+ Statement st = conn.createStatement()) {
+ st.execute("DROP TABLE IF EXISTS " + table);
+ }
+ }
+
+ @Test
+ void pgTimeUnaffectedByMysqlTimeFix() throws Exception {
+ try (CdcClientReadHarness harness =
+ CdcClientReadHarness.postgres(
+ jobId,
+ POSTGRES.getHost(),
+
POSTGRES.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT),
+ POSTGRES.getUsername(),
+ POSTGRES.getPassword(),
+ POSTGRES.getDatabaseName(),
+ "public",
+ table,
+ "initial")) {
+
+ List<SnapshotSplit> splits = harness.fetchAllSnapshotSplits(table);
+ CdcClientReadHarness.SnapshotResult snapshot =
harness.readSnapshot(splits);
+ Map<Integer, JsonNode> snap = indexById(snapshot.records());
+
+ // in-range ordinary value is unchanged (the PG common case)
+
assertThat(snap.get(1).get("t_col").asText()).isEqualTo("12:34:56.123456");
+ // PG-legal 24:00:00 is now formatted instead of emitted as a raw
long
+
assertThat(snap.get(2).get("t_col").asText()).isEqualTo("24:00:00");
+ }
+ }
+
+ private Map<Integer, JsonNode> indexById(List<String> records) throws
Exception {
+ Map<Integer, JsonNode> result = new HashMap<>();
+ for (String record : records) {
+ JsonNode node = MAPPER.readTree(record);
+ result.put(node.get("id").asInt(), node);
+ }
+ return result;
+ }
+
+ private Connection connect() throws Exception {
+ return DriverManager.getConnection(
+ POSTGRES.getJdbcUrl(), POSTGRES.getUsername(),
POSTGRES.getPassword());
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresTypeConsistencyITCase.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresTypeConsistencyITCase.java
new file mode 100644
index 00000000000..0f44ffe1c9b
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresTypeConsistencyITCase.java
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * PostgreSQL counterpart of {@link MySqlTypeConsistencyITCase}: the same
value inserted before
+ * capture (snapshot, JDBC path) and after (binlog, logical-decoding path)
must deserialize
+ * identically per column. JSON/JSONB are compared by parsed value to tolerate
whitespace/key-order
+ * differences while still catching a genuinely different value.
+ */
+@Testcontainers
+class PostgresTypeConsistencyITCase {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final AtomicLong JOB_ID_SEQ = new AtomicLong(595_000);
+
+ // Identical column tuple for both the snapshot row (id 1) and the binlog
row (id 2).
+ private static final String VALUES =
+ "12345678901234.567890," // c_numeric NUMERIC(20,6)
+ + "1.5," // c_real REAL
+ + "3.141592653589793," // c_double DOUBLE PRECISION
+ + "true," // c_bool BOOLEAN
+ + "'hello'," // c_text TEXT
+ + "'world'," // c_varchar VARCHAR(20)
+ + "'abcde'," // c_char CHAR(5)
+ + "'{\"k\": 1, \"a\": [2, 3]}'," // c_json JSON
+ + "'{\"k\": 1, \"a\": [2, 3]}'," // c_jsonb JSONB
+ + "B'10100101'," // c_bit BIT(8)
+ + "B'101'," // c_varbit VARBIT(16)
+ + "'\\x0102030405'," // c_bytea BYTEA
+ + "'11111111-1111-1111-1111-111111111111'," // c_uuid UUID
+ + "'2023-08-15'," // c_date DATE
+ + "'12:34:56.123456'," // c_time TIME(6)
+ + "'12:34:56.123456+08'," // c_timetz TIMETZ
+ + "'2023-08-15 12:34:56.123456'," // c_timestamp
TIMESTAMP(6)
+ + "'2023-08-15 12:34:56.123456+08'," // c_timestamptz
TIMESTAMPTZ
+ + "'1 day 02:03:04'," // c_interval INTERVAL
+ + "'192.168.0.1'," // c_inet INET
+ + "'{1,2,3}'," // c_int_arr INT[]
+ + "'{a,b}'," // c_text_arr TEXT[]
+ + "'12.34'"; // c_money MONEY
+
+ @Container
+ static final PostgreSQLContainer<?> POSTGRES =
+ new PostgreSQLContainer<>(DockerImageName.parse("postgres:14"))
+ .withCommand("postgres", "-c", "wal_level=logical");
+
+ private String jobId;
+ private String table;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+ table = "t_scan_" + jobId;
+ try (Connection conn = connect();
+ Statement st = conn.createStatement()) {
+ st.execute("DROP TABLE IF EXISTS " + table);
+ st.execute(
+ "CREATE TABLE " + table + " ("
+ + "id INT PRIMARY KEY,"
+ + "c_numeric NUMERIC(20,6),"
+ + "c_real REAL,"
+ + "c_double DOUBLE PRECISION,"
+ + "c_bool BOOLEAN,"
+ + "c_text TEXT,"
+ + "c_varchar VARCHAR(20),"
+ + "c_char CHAR(5),"
+ + "c_json JSON,"
+ + "c_jsonb JSONB,"
+ + "c_bit BIT(8),"
+ + "c_varbit VARBIT(16),"
+ + "c_bytea BYTEA,"
+ + "c_uuid UUID,"
+ + "c_date DATE,"
+ + "c_time TIME(6),"
+ + "c_timetz TIMETZ,"
+ + "c_timestamp TIMESTAMP(6),"
+ + "c_timestamptz TIMESTAMPTZ,"
+ + "c_interval INTERVAL,"
+ + "c_inet INET,"
+ + "c_int_arr INT[],"
+ + "c_text_arr TEXT[],"
+ + "c_money MONEY)");
+ st.execute("ALTER TABLE " + table + " REPLICA IDENTITY FULL");
+ st.execute("INSERT INTO " + table + " VALUES (1," + VALUES + ")");
+ }
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ Env.getCurrentEnv().close(jobId);
+ try (Connection conn = connect();
+ Statement st = conn.createStatement()) {
+ st.execute("DROP TABLE IF EXISTS " + table);
+ }
+ }
+
+ @Test
+ void snapshotAndBinlogDeserializeIdentically() throws Exception {
+ try (CdcClientReadHarness harness =
+ CdcClientReadHarness.postgres(
+ jobId,
+ POSTGRES.getHost(),
+
POSTGRES.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT),
+ POSTGRES.getUsername(),
+ POSTGRES.getPassword(),
+ POSTGRES.getDatabaseName(),
+ "public",
+ table,
+ "initial")) {
+
+ List<SnapshotSplit> splits = harness.fetchAllSnapshotSplits(table);
+ CdcClientReadHarness.SnapshotResult snapshot =
harness.readSnapshot(splits);
+ JsonNode snap = MAPPER.readTree(snapshot.records().get(0));
+
+ try (Connection conn = connect();
+ Statement st = conn.createStatement()) {
+ st.execute("INSERT INTO " + table + " VALUES (2," + VALUES +
")");
+ }
+ List<String> binlog = harness.readBinlogUntil(snapshot, splits, 1,
Duration.ofSeconds(60));
+ JsonNode bin = MAPPER.readTree(binlog.get(0));
+
+ List<String> mismatches = new ArrayList<>();
+ Iterator<String> fields = snap.fieldNames();
+ while (fields.hasNext()) {
+ String col = fields.next();
+ if (col.equals("id") || col.startsWith("__DORIS")) {
+ continue;
+ }
+ JsonNode snapNode = snap.get(col);
+ JsonNode binNode = bin.get(col);
+ if (!columnsMatch(col, snapNode, binNode)) {
+ mismatches.add(col + ": snapshot=[" + snapNode + "]
binlog=[" + binNode + "]");
+ }
+ }
+ mismatches.forEach(m -> System.out.println("[PG TYPE
SCAN][MISMATCH] " + m));
+ assertThat(mismatches).as("snapshot vs binlog per-column
mismatches").isEmpty();
+ }
+ }
+
+ // Compare the parsed nodes directly so container columns (arrays such as
c_int_arr/c_text_arr)
+ // compare by content rather than collapsing to "" via JsonNode.asText().
The whitespace/key-order
+ // tolerance is limited to the JSON/JSONB columns: a JSON value carried as
a string can differ in
+ // spacing/key order between the snapshot (JDBC) and binlog paths, so
those are compared by parsed
+ // value; every other column must match exactly so a real representation
difference is never masked.
+ private boolean columnsMatch(String col, JsonNode a, JsonNode b) {
+ if (a == null || b == null) {
+ return false;
+ }
+ if (a.equals(b)) {
+ return true;
+ }
+ if (col.equals("c_json") || col.equals("c_jsonb")) {
+ try {
+ return
MAPPER.readTree(a.asText()).equals(MAPPER.readTree(b.asText()));
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ return false;
+ }
+
+ private Connection connect() throws Exception {
+ return DriverManager.getConnection(
+ POSTGRES.getJdbcUrl(), POSTGRES.getUsername(),
POSTGRES.getPassword());
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
index 75b2ffbfe9e..404ff7f1d56 100644
---
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
@@ -24,8 +24,12 @@ import java.time.ZoneId;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTime;
import io.debezium.time.NanoTimestamp;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
/** Unit tests for {@link DebeziumJsonDeserializer}. */
class DebeziumJsonDeserializerTest {
@@ -75,6 +79,106 @@ class DebeziumJsonDeserializerTest {
}
}
+ // ─── convertToTime (MySQL TIME full range)
────────────────────────────────
+ // MySQL TIME spans [-838:59:59, 838:59:59] (Debezium MicroTime/NanoTime,
long micros/nanos).
+ // In-range values keep the LocalTime format; out-of-range (negative or
>=24h) must format
+ // as ±HH:MM:SS[.ffffff] instead of falling back to the raw long literal.
+
+ @Test
+ void microTime_zero_isMidnight() {
+ assertEquals("00:00", invokeConvertToTime(MicroTime.SCHEMA_NAME, 0L));
+ }
+
+ @Test
+ void microTime_inRange_withMicros() {
+ assertEquals("12:34:56.123456",
invokeConvertToTime(MicroTime.SCHEMA_NAME, 45_296_123_456L));
+ }
+
+ @Test
+ void microTime_inRange_upperBound() {
+ assertEquals("23:59:59.999999",
invokeConvertToTime(MicroTime.SCHEMA_NAME, 86_399_999_999L));
+ }
+
+ @Test
+ void microTime_negative_mysqlLowerBound() {
+ // MySQL '-838:59:59' = -3_020_399_000_000 micros; must not fall back
to the raw long.
+ assertEquals("-838:59:59", invokeConvertToTime(MicroTime.SCHEMA_NAME,
-3_020_399_000_000L));
+ }
+
+ @Test
+ void microTime_over24h_mysqlUpperBound() {
+ // MySQL '838:59:59.999999' = 3_020_399_999_999 micros.
+ assertEquals(
+ "838:59:59.999999", invokeConvertToTime(MicroTime.SCHEMA_NAME,
3_020_399_999_999L));
+ }
+
+ @Test
+ void nanoTime_negative_mysqlLowerBound() {
+ assertEquals(
+ "-838:59:59", invokeConvertToTime(NanoTime.SCHEMA_NAME,
-3_020_399_000_000_000L));
+ }
+
+ private Object invokeConvertToTime(String schemaName, Object dbzObj) {
+ try {
+ Schema schema =
SchemaBuilder.int64().name(schemaName).optional().build();
+ Method m =
+ DebeziumJsonDeserializer.class.getDeclaredMethod(
+ "convertToTime", Object.class, Schema.class);
+ m.setAccessible(true);
+ return m.invoke(deserializer, dbzObj, schema);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // ─── formatTimeText (fraction padding / trailing-zero stripping)
───────────
+ // convertToTime above only routes through formatTimeText with
whole-second or
+ // full-6-digit fractions, so the sub-second padding and trailing-zero
stripping
+ // branches are exercised directly here.
+
+ @Test
+ void formatTimeText_trailingZerosStrippedToSingleDigit() {
+ assertEquals("00:00:00.5", invokeFormatTimeText(500_000L));
+ }
+
+ @Test
+ void formatTimeText_trailingZerosStrippedToTwoDigits() {
+ assertEquals("00:00:00.12", invokeFormatTimeText(120_000L));
+ }
+
+ @Test
+ void formatTimeText_millisecondFraction() {
+ assertEquals("00:00:00.123", invokeFormatTimeText(123_000L));
+ }
+
+ @Test
+ void formatTimeText_subMicroFractionLeftPadded() {
+ // 5 micros -> ".000005": padded to six digits, no trailing zero to
strip.
+ assertEquals("00:00:00.000005", invokeFormatTimeText(5L));
+ }
+
+ @Test
+ void formatTimeText_negativeWholeSecondPadsHourAndMinute() {
+ // -30 minutes: hour/minute keep two digits, no fractional part.
+ assertEquals("-00:30:00", invokeFormatTimeText(-1_800_000_000L));
+ }
+
+ @Test
+ void formatTimeText_negativeKeepsSignBeforeFraction() {
+ assertEquals("-00:00:00.5", invokeFormatTimeText(-500_000L));
+ }
+
+ private String invokeFormatTimeText(long microsTotal) {
+ try {
+ Method m =
+
DebeziumJsonDeserializer.class.getDeclaredMethod("formatTimeText", long.class);
+ m.setAccessible(true);
+ return (String) m.invoke(null, microsTotal);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
// ─── convertZoneTime
──────────────────────────────────────────────────────
// timetz arrives as a UTC-normalized ISO string (Debezium ZonedTime). cdc
keeps it
// verbatim with the offset preserved, independent of serverTimeZone,
since a
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/GtidMultiIntervalOffsetTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/GtidMultiIntervalOffsetTest.java
new file mode 100644
index 00000000000..2646b981124
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/GtidMultiIntervalOffsetTest.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.mysql;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.github.shyiko.mysql.binlog.GtidSet;
+import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Guard for MySQL GTID sets that carry more than one interval per server uuid
(a "split
+ * interval", e.g. {@code uuid:1-3:5-7} left behind when the gap transaction 4
was purged).
+ *
+ * <p>We delegate all GTID parsing/serialization to the binlog connector's
{@link GtidSet} and
+ * never split the string ourselves, so the go-mysql #550 class of bug (a
single-interval regex
+ * that silently merges split intervals and persists a corrupt resume position
→ ERROR 1236)
+ * cannot regress unnoticed. These tests pin that the split interval survives
both GtidSet
+ * parsing/round-trip and our FE offset-map persistence without the gap being
swallowed.
+ */
+class GtidMultiIntervalOffsetTest {
+
+ private static final String SERVER_UUID =
"24bc7850-2c16-11e6-a073-0242ac110002";
+ // Two disjoint intervals for the same uuid, with a purged gap at
transaction 4.
+ private static final String MULTI_INTERVAL_GTID = SERVER_UUID + ":1-3:5-7";
+
+ @Test
+ void splitIntervalParsesAsTwoIntervalsNotMergedIntoOne() {
+ List<GtidSet.Interval> intervals =
+ new
GtidSet(MULTI_INTERVAL_GTID).getUUIDSet(SERVER_UUID).getIntervals();
+ // The gap at txn 4 must keep this as two intervals, never collapsed
into a single 1-7.
+ assertEquals(2, intervals.size());
+ assertEquals(1, intervals.get(0).getStart());
+ assertEquals(3, intervals.get(0).getEnd());
+ assertEquals(5, intervals.get(1).getStart());
+ assertEquals(7, intervals.get(1).getEnd());
+ }
+
+ @Test
+ void splitIntervalSurvivesGtidSetSerializationRoundTrip() {
+ // Re-serializing then re-parsing must not lose the gap (toString ->
parse -> two intervals).
+ String reSerialized = new GtidSet(MULTI_INTERVAL_GTID).toString();
+ assertEquals(
+ 2, new
GtidSet(reSerialized).getUUIDSet(SERVER_UUID).getIntervals().size());
+ }
+
+ // BinlogOffset-layer guard only; the real generateMySqlConfig resume path
is covered by
+ // MySqlSourceReaderTest / MySqlStartupGtidOffsetITCase.
+ @Test
+ void splitIntervalSurvivesFeOffsetMapRoundTrip() {
+ // FE persists/restores the binlog offset as a string-valued map;
rebuild from it.
+ Map<String, String> feOffset = new HashMap<>();
+ feOffset.put(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY,
"mysql-bin.000004");
+ feOffset.put(BinlogOffset.BINLOG_POSITION_OFFSET_KEY, "1024");
+ feOffset.put(BinlogOffset.GTID_SET_KEY, MULTI_INTERVAL_GTID);
+
+ BinlogOffset restored = new BinlogOffset(new HashMap<>(feOffset));
+
+ // The gtids value is opaque to JSON and must come back byte-for-byte.
+ assertEquals(MULTI_INTERVAL_GTID, restored.getGtidSet());
+ // And it must still parse into two intervals after the round trip.
+ assertEquals(
+ 2, new
GtidSet(restored.getGtidSet()).getUUIDSet(SERVER_UUID).getIntervals().size());
+ }
+
+ @Test
+ void purgedGapTransactionStaysOutsideTheSplitInterval() {
+ GtidSet multiInterval = new GtidSet(MULTI_INTERVAL_GTID);
+ // Transactions inside the two intervals are contained; the purged gap
(txn 4) is not.
+ assertTrue(new GtidSet(SERVER_UUID +
":2-2").isContainedWithin(multiInterval));
+ assertTrue(new GtidSet(SERVER_UUID +
":6-6").isContainedWithin(multiInterval));
+ assertFalse(new GtidSet(SERVER_UUID +
":4-4").isContainedWithin(multiInterval));
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
index 1192df291d3..92be9a68c3c 100644
---
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
@@ -18,12 +18,24 @@
package org.apache.doris.cdcclient.source.reader.mysql;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+
+import com.github.shyiko.mysql.binlog.GtidSet;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
+import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.junit.jupiter.api.Test;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
public class MySqlSourceReaderTest {
+ private static final String SERVER_UUID =
"24bc7850-2c16-11e6-a073-0242ac110002";
+
@Test
void testNormalizeSslModeMapsAllLegalValues() {
assertEquals("disabled",
MySqlSourceReader.normalizeSslModeForMysql("disable"));
@@ -59,4 +71,47 @@ public class MySqlSourceReaderTest {
IllegalArgumentException.class,
() -> MySqlSourceReader.normalizeSslModeForMysql("DISABLE"));
}
+
+ // A JSON resume offset carrying file/pos AND a multi-interval gtid set
must keep the gtids
+ // (and a non-null kind) through generateMySqlConfig, not just file/pos.
+ @Test
+ void jsonOffsetWithGtidsPreservesMultiIntervalGtidSet() throws Exception {
+ String gtid = SERVER_UUID + ":1-3:5-7";
+ BinlogOffset off =
+ startupBinlogOffset(
+
"{\"file\":\"mysql-bin.000004\",\"pos\":\"1024\",\"gtids\":\""
+ + gtid
+ + "\"}");
+ assertEquals(gtid, off.getGtidSet());
+ assertNotNull(off.getOffsetKind());
+ assertEquals(
+ 2, new
GtidSet(off.getGtidSet()).getUUIDSet(SERVER_UUID).getIntervals().size());
+ }
+
+ // A gtid-only JSON offset (no file/pos) is accepted, matching Flink CDC's
specific-offset rules.
+ @Test
+ void jsonOffsetGtidOnlyIsAccepted() throws Exception {
+ String gtid = SERVER_UUID + ":1-7";
+ BinlogOffset off = startupBinlogOffset("{\"gtids\":\"" + gtid + "\"}");
+ assertEquals(gtid, off.getGtidSet());
+ assertNotNull(off.getOffsetKind());
+ }
+
+ // Drive the real generateMySqlConfig JSON-offset path and return the
rebuilt startup offset.
+ private BinlogOffset startupBinlogOffset(String offsetJson) throws
Exception {
+ Map<String, String> cfg = new HashMap<>();
+ cfg.put(DataSourceConfigKeys.JDBC_URL,
"jdbc:mysql://localhost:3306/testdb");
+ cfg.put(DataSourceConfigKeys.USER, "u");
+ cfg.put(DataSourceConfigKeys.PASSWORD, "p");
+ cfg.put(DataSourceConfigKeys.DATABASE, "testdb");
+ cfg.put(DataSourceConfigKeys.TABLE, "t_test");
+ cfg.put(DataSourceConfigKeys.OFFSET, offsetJson);
+ Method m =
+ MySqlSourceReader.class.getDeclaredMethod(
+ "generateMySqlConfig", Map.class, String.class,
int.class);
+ m.setAccessible(true);
+ MySqlSourceConfig config =
+ (MySqlSourceConfig) m.invoke(new MySqlSourceReader(), cfg,
"job-1", 0);
+ return config.getStartupOptions().binlogOffset;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]