This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 6c1b33bb7da branch-4.0: [fix](streaming-job) restore split-bound Java 
types when reading FE-persisted CDC offset #63219 (#63274)
6c1b33bb7da is described below

commit 6c1b33bb7daa4aa68c94fdf8b327a67a35a972ba
Author: wudi <[email protected]>
AuthorDate: Wed May 27 12:25:25 2026 +0800

    branch-4.0: [fix](streaming-job) restore split-bound Java types when 
reading FE-persisted CDC offset #63219 (#63274)
    
    Cherry-picked from #63219
---
 fs_brokers/cdc_client/pom.xml                      |   7 +
 .../source/reader/AbstractCdcSourceReader.java     |  73 +++++++++
 .../source/reader/JdbcIncrementalSourceReader.java |  41 ++++-
 .../source/reader/mysql/MySqlSourceReader.java     |  62 ++++++--
 .../reader/postgres/PostgresSourceReader.java      |  23 +++
 .../source/reader/AbstractCdcSourceReaderTest.java | 165 +++++++++++++++++++++
 .../cdc/test_streaming_mysql_job_date_pk.out       |  29 ++++
 .../cdc/test_streaming_postgres_job_date_pk.out    |  29 ++++
 .../cdc/test_streaming_mysql_job_date_pk.groovy    | 129 ++++++++++++++++
 .../cdc/test_streaming_postgres_job_date_pk.groovy | 131 ++++++++++++++++
 10 files changed, 673 insertions(+), 16 deletions(-)

diff --git a/fs_brokers/cdc_client/pom.xml b/fs_brokers/cdc_client/pom.xml
index cac45bdc7a1..afa186844b7 100644
--- a/fs_brokers/cdc_client/pom.xml
+++ b/fs_brokers/cdc_client/pom.xml
@@ -178,6 +178,13 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>5.10.2</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
new file mode 100644
index 00000000000..9be46307bab
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader;
+
+import org.apache.doris.job.cdc.request.JobBaseConfig;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+
+public abstract class AbstractCdcSourceReader implements SourceReader {
+
+    private final Map<String, Class<?>> splitKeyClassCache = new 
ConcurrentHashMap<>();
+
+    protected abstract Class<?> probeSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig);
+
+    protected Class<?> resolveSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+        String key = tableId.identifier() + "." + splitColumn.name();
+        return splitKeyClassCache.computeIfAbsent(
+                key, k -> probeSplitKeyClass(tableId, splitColumn, jobConfig));
+    }
+
+    public static Object[] convertBounds(Object[] raw, Class<?> target, 
ObjectMapper mapper) {
+        if (raw == null) {
+            return null;
+        }
+        Object[] out = new Object[raw.length];
+        for (int i = 0; i < raw.length; i++) {
+            out[i] = convertBound(raw[i], target, mapper);
+        }
+        return out;
+    }
+
+    private static Object convertBound(Object v, Class<?> target, ObjectMapper 
mapper) {
+        if (v == null) {
+            return null;
+        }
+        if (target.isInstance(v)) {
+            return v;
+        }
+        String s = v.toString();
+        if (target == java.sql.Date.class) {
+            return java.sql.Date.valueOf(s);
+        }
+        if (target == java.sql.Timestamp.class) {
+            return java.sql.Timestamp.valueOf(s);
+        }
+        if (target == java.sql.Time.class) {
+            return java.sql.Time.valueOf(s);
+        }
+        return mapper.convertValue(v, target);
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index c43595826e3..f4720579051 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -83,7 +83,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Data
-public abstract class JdbcIncrementalSourceReader implements SourceReader {
+public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReader {
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcIncrementalSourceReader.class);
     private static ObjectMapper objectMapper = new ObjectMapper();
     private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
@@ -585,8 +585,6 @@ public abstract class JdbcIncrementalSourceReader 
implements SourceReader {
             createSnapshotSplit(Map<String, Object> offset, JobBaseConfig 
jobConfig) {
         SnapshotSplit snapshotSplit = objectMapper.convertValue(offset, 
SnapshotSplit.class);
         TableId tableId = TableId.parse(snapshotSplit.getTableId(), false);
-        Object[] splitStart = snapshotSplit.getSplitStart();
-        Object[] splitEnd = snapshotSplit.getSplitEnd();
         List<String> splitKeys = snapshotSplit.getSplitKey();
         Map<TableId, TableChanges.TableChange> tableSchemas = 
getTableSchemas(jobConfig);
         TableChanges.TableChange tableChange = tableSchemas.get(tableId);
@@ -594,7 +592,18 @@ public abstract class JdbcIncrementalSourceReader 
implements SourceReader {
                 tableChange, "Can not find table " + tableId + " in job " + 
jobConfig.getJobId());
         // only support one split key
         String splitKey = splitKeys.get(0);
-        io.debezium.relational.Column splitColumn = 
tableChange.getTable().columnWithName(splitKey);
+        Column splitColumn = tableChange.getTable().columnWithName(splitKey);
+        Preconditions.checkNotNull(
+                splitColumn,
+                "Split key column "
+                        + splitKey
+                        + " not found in table "
+                        + tableId
+                        + " for job "
+                        + jobConfig.getJobId());
+        Class<?> keyClass = resolveSplitKeyClass(tableId, splitColumn, 
jobConfig);
+        Object[] splitStart = convertBounds(snapshotSplit.getSplitStart(), 
keyClass, objectMapper);
+        Object[] splitEnd = convertBounds(snapshotSplit.getSplitEnd(), 
keyClass, objectMapper);
         RowType splitType = getSplitType(splitColumn);
         org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit 
split =
                 new 
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit(
@@ -631,6 +640,7 @@ public abstract class JdbcIncrementalSourceReader 
implements SourceReader {
                             
.sorted(Comparator.comparing(AbstractSourceSplit::getSplitId))
                             .toList();
 
+            Map<TableId, TableChanges.TableChange> tableSchemas = 
getTableSchemas(config);
             for (SnapshotSplit split : assignedSplitLists) {
                 // find the min offset
                 Map<String, String> offsetMap = split.getHighWatermark();
@@ -641,12 +651,29 @@ public abstract class JdbcIncrementalSourceReader 
implements SourceReader {
                 if (maxOffsetFinishSplits == null || 
sourceOffset.isAfter(maxOffsetFinishSplits)) {
                     maxOffsetFinishSplits = sourceOffset;
                 }
+                TableId tid = TableId.parse(split.getTableId());
+                TableChanges.TableChange tableChange = tableSchemas.get(tid);
+                Preconditions.checkNotNull(
+                        tableChange, "Can not find table " + tid + " in job " 
+ config.getJobId());
+                String splitKey = split.getSplitKey().get(0);
+                Column splitColumn = 
tableChange.getTable().columnWithName(splitKey);
+                Preconditions.checkNotNull(
+                        splitColumn,
+                        "Split key column "
+                                + splitKey
+                                + " not found in table "
+                                + tid
+                                + " for job "
+                                + config.getJobId());
+                Class<?> keyClass = resolveSplitKeyClass(tid, splitColumn, 
config);
+                Object[] start = convertBounds(split.getSplitStart(), 
keyClass, objectMapper);
+                Object[] end = convertBounds(split.getSplitEnd(), keyClass, 
objectMapper);
                 finishedSnapshotSplitInfos.add(
                         new FinishedSnapshotSplitInfo(
-                                TableId.parse(split.getTableId()),
+                                tid,
                                 split.getSplitId(),
-                                split.getSplitStart(),
-                                split.getSplitEnd(),
+                                start,
+                                end,
                                 sourceOffset,
                                 getOffsetFactory()));
             }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 5675b3dd835..477c0a36f01 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -20,8 +20,8 @@ package org.apache.doris.cdcclient.source.reader.mysql;
 import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer;
 import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
 import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.AbstractCdcSourceReader;
 import org.apache.doris.cdcclient.source.reader.SnapshotReaderContext;
-import org.apache.doris.cdcclient.source.reader.SourceReader;
 import org.apache.doris.cdcclient.source.reader.SplitReadResult;
 import org.apache.doris.cdcclient.source.reader.SplitRecords;
 import org.apache.doris.cdcclient.utils.ConfigUtil;
@@ -59,6 +59,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
 import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
 import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
 import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils;
 import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
 import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
@@ -67,7 +68,9 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.io.IOException;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -110,7 +113,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Data
-public class MySqlSourceReader implements SourceReader {
+public class MySqlSourceReader extends AbstractCdcSourceReader {
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlSourceReader.class);
     private static ObjectMapper objectMapper = new ObjectMapper();
     private static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
@@ -579,8 +582,6 @@ public class MySqlSourceReader implements SourceReader {
             Map<String, Object> offset, JobBaseConfig jobConfig) throws 
JsonProcessingException {
         SnapshotSplit snapshotSplit = objectMapper.convertValue(offset, 
SnapshotSplit.class);
         TableId tableId = TableId.parse(snapshotSplit.getTableId());
-        Object[] splitStart = snapshotSplit.getSplitStart();
-        Object[] splitEnd = snapshotSplit.getSplitEnd();
         List<String> splitKeys = snapshotSplit.getSplitKey();
         Map<TableId, TableChanges.TableChange> tableSchemas = 
getTableSchemas(jobConfig);
         TableChanges.TableChange tableChange = tableSchemas.get(tableId);
@@ -589,6 +590,17 @@ public class MySqlSourceReader implements SourceReader {
         // only support one split key
         String splitKey = splitKeys.get(0);
         Column splitColumn = tableChange.getTable().columnWithName(splitKey);
+        Preconditions.checkNotNull(
+                splitColumn,
+                "Split key column "
+                        + splitKey
+                        + " not found in table "
+                        + tableId
+                        + " for job "
+                        + jobConfig.getJobId());
+        Class<?> keyClass = resolveSplitKeyClass(tableId, splitColumn, 
jobConfig);
+        Object[] splitStart = convertBounds(snapshotSplit.getSplitStart(), 
keyClass, objectMapper);
+        Object[] splitEnd = convertBounds(snapshotSplit.getSplitEnd(), 
keyClass, objectMapper);
         RowType splitType = ChunkUtils.getChunkKeyColumnType(splitColumn, 
false);
         MySqlSnapshotSplit split =
                 new MySqlSnapshotSplit(
@@ -620,6 +632,7 @@ public class MySqlSourceReader implements SourceReader {
                             
.sorted(Comparator.comparing(AbstractSourceSplit::getSplitId))
                             .toList();
 
+            Map<TableId, TableChanges.TableChange> tableSchemas = 
getTableSchemas(config);
             for (SnapshotSplit split : assignedSplitLists) {
                 // find the min binlog offset
                 Map<String, String> offsetMap = split.getHighWatermark();
@@ -630,13 +643,26 @@ public class MySqlSourceReader implements SourceReader {
                 if (maxOffsetFinishSplits == null || 
binlogOffset.isAfter(maxOffsetFinishSplits)) {
                     maxOffsetFinishSplits = binlogOffset;
                 }
+                TableId tid = TableId.parse(split.getTableId());
+                TableChanges.TableChange tableChange = tableSchemas.get(tid);
+                Preconditions.checkNotNull(
+                        tableChange, "Can not find table " + tid + " in job " 
+ config.getJobId());
+                String splitKey = split.getSplitKey().get(0);
+                Column splitColumn = 
tableChange.getTable().columnWithName(splitKey);
+                Preconditions.checkNotNull(
+                        splitColumn,
+                        "Split key column "
+                                + splitKey
+                                + " not found in table "
+                                + tid
+                                + " for job "
+                                + config.getJobId());
+                Class<?> keyClass = resolveSplitKeyClass(tid, splitColumn, 
config);
+                Object[] start = convertBounds(split.getSplitStart(), 
keyClass, objectMapper);
+                Object[] end = convertBounds(split.getSplitEnd(), keyClass, 
objectMapper);
                 finishedSnapshotSplitInfos.add(
                         new FinishedSnapshotSplitInfo(
-                                TableId.parse(split.getTableId()),
-                                split.getSplitId(),
-                                split.getSplitStart(),
-                                split.getSplitEnd(),
-                                binlogOffset));
+                                tid, split.getSplitId(), start, end, 
binlogOffset));
             }
         }
 
@@ -980,6 +1006,24 @@ public class MySqlSourceReader implements SourceReader {
         return schemas;
     }
 
+    @Override
+    protected Class<?> probeSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+        MySqlSourceConfig sourceConfig = getSourceConfig(jobConfig);
+        String sql =
+                String.format(
+                        "SELECT %s FROM %s WHERE 1=0",
+                        StatementUtils.quote(splitColumn.name()), 
StatementUtils.quote(tableId));
+        try (MySqlConnection jdbc = 
DebeziumUtils.createMySqlConnection(sourceConfig);
+                Statement st = jdbc.connection().createStatement();
+                ResultSet rs = st.executeQuery(sql)) {
+            return Class.forName(rs.getMetaData().getColumnClassName(1));
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Probe split key class failed for " + tableId + "." + 
splitColumn.name(), e);
+        }
+    }
+
     private Map<TableId, TableChanges.TableChange> 
discoverTableSchemas(JobBaseConfig config) {
         MySqlSourceConfig sourceConfig = getSourceConfig(config);
         try (MySqlConnection jdbc = 
DebeziumUtils.createMySqlConnection(sourceConfig)) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index bb8c0bb904e..607f68aea64 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -47,10 +47,13 @@ import 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetch
 import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
 import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
 import 
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
 import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
 import 
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
 import org.apache.flink.table.types.DataType;
 
+import java.sql.ResultSet;
+import java.sql.Statement;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Arrays;
@@ -288,6 +291,26 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         return PostgresTypeUtils.fromDbzColumn(splitColumn);
     }
 
+    @Override
+    protected Class<?> probeSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+        PostgresSourceConfig sourceConfig = getSourceConfig(jobConfig);
+        String sql =
+                String.format(
+                        "SELECT %s FROM %s WHERE 1=0",
+                        PostgresQueryUtils.quote(splitColumn.name()),
+                        PostgresQueryUtils.quote(tableId));
+        try (JdbcConnection jdbc =
+                        new 
PostgresDialect(sourceConfig).openJdbcConnection(sourceConfig);
+                Statement st = jdbc.connection().createStatement();
+                ResultSet rs = st.executeQuery(sql)) {
+            return Class.forName(rs.getMetaData().getColumnClassName(1));
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Probe split key class failed for " + tableId + "." + 
splitColumn.name(), e);
+        }
+    }
+
     /**
      * Why not call dialect.displayCurrentOffset(sourceConfig) ? The 
underlying system calls
      * `txid_current()` to advance the WAL log. Here, it's just a query; 
retrieving the LSN is
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
new file mode 100644
index 00000000000..fbca1ad41e6
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader;
+
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+class AbstractCdcSourceReaderTest {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    @Test
+    void convertBoundsRestoresDateFromString() {
+        Object[] raw = new Object[] {"2025-01-06"};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, Date.class, 
MAPPER);
+        assertTrue(out[0] instanceof Date);
+        assertEquals(Date.valueOf("2025-01-06"), out[0]);
+    }
+
+    @Test
+    void convertBoundsRestoresTimestampFromString() {
+        Object[] raw = new Object[] {"2025-01-06 12:34:56"};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, 
Timestamp.class, MAPPER);
+        assertTrue(out[0] instanceof Timestamp);
+        assertEquals(Timestamp.valueOf("2025-01-06 12:34:56"), out[0]);
+    }
+
+    @Test
+    void convertBoundsRestoresLongFromJsonInteger() {
+        // Jackson typically deserializes JSON integer to Integer; restore as 
Long for int8 cols.
+        Object[] raw = new Object[] {123456};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, Long.class, 
MAPPER);
+        assertTrue(out[0] instanceof Long);
+        assertEquals(123456L, out[0]);
+    }
+
+    @Test
+    void convertBoundsRestoresBigDecimalFromString() {
+        Object[] raw = new Object[] {"12.34"};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, 
BigDecimal.class, MAPPER);
+        assertTrue(out[0] instanceof BigDecimal);
+        assertEquals(new BigDecimal("12.34"), out[0]);
+    }
+
+    @Test
+    void convertBoundsPreservesStringForVarcharColumns() {
+        Object[] raw = new Object[] {"event_id_42"};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, 
String.class, MAPPER);
+        assertEquals("event_id_42", out[0]);
+    }
+
+    @Test
+    void convertBoundsReturnsNullForNullInput() {
+        assertNull(AbstractCdcSourceReader.convertBounds(null, Date.class, 
MAPPER));
+    }
+
+    @Test
+    void convertBoundsKeepsNullElement() {
+        Object[] raw = new Object[] {null};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, Date.class, 
MAPPER);
+        assertEquals(1, out.length);
+        assertNull(out[0]);
+    }
+
+    @Test
+    void convertBoundsHandlesMultiElementArray() {
+        Object[] raw = new Object[] {"2025-01-06", null};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, Date.class, 
MAPPER);
+        assertEquals(2, out.length);
+        assertEquals(Date.valueOf("2025-01-06"), out[0]);
+        assertNull(out[1]);
+    }
+
+    @Test
+    void restoresDateChunkKeyAfterFeRoundTrip() {
+        Map<String, Object> feOffset = new HashMap<>();
+        feOffset.put("splitId", "public.events:0");
+        feOffset.put("tableId", "public.events");
+        feOffset.put("splitKey", Arrays.asList("event_date"));
+        feOffset.put("splitStart", null);
+        feOffset.put("splitEnd", Arrays.asList("2025-01-06"));
+
+        SnapshotSplit deserialized = MAPPER.convertValue(feOffset, 
SnapshotSplit.class);
+        assertEquals(String.class, deserialized.getSplitEnd()[0].getClass());
+
+        Object[] restored =
+                AbstractCdcSourceReader.convertBounds(
+                        deserialized.getSplitEnd(), Date.class, MAPPER);
+        assertEquals(Date.class, restored[0].getClass());
+        assertEquals(Date.valueOf("2025-01-06"), restored[0]);
+    }
+
+    @Test
+    void restoresTimestampChunkKeyAfterFeRoundTrip() {
+        Map<String, Object> feOffset = new HashMap<>();
+        feOffset.put("splitId", "public.orders:7");
+        feOffset.put("tableId", "public.orders");
+        feOffset.put("splitKey", Arrays.asList("created_at"));
+        feOffset.put("splitStart", Arrays.asList("2025-01-06 00:00:00"));
+        feOffset.put("splitEnd", Arrays.asList("2025-01-07 00:00:00"));
+
+        SnapshotSplit deserialized = MAPPER.convertValue(feOffset, 
SnapshotSplit.class);
+
+        Object[] restoredStart =
+                AbstractCdcSourceReader.convertBounds(
+                        deserialized.getSplitStart(), Timestamp.class, MAPPER);
+        Object[] restoredEnd =
+                AbstractCdcSourceReader.convertBounds(
+                        deserialized.getSplitEnd(), Timestamp.class, MAPPER);
+        assertEquals(Timestamp.class, restoredStart[0].getClass());
+        assertEquals(Timestamp.valueOf("2025-01-06 00:00:00"), 
restoredStart[0]);
+        assertEquals(Timestamp.valueOf("2025-01-07 00:00:00"), restoredEnd[0]);
+    }
+
+    @Test
+    void restoresBigintChunkKeyAfterFeRoundTrip() {
+        Map<String, Object> feOffset = new HashMap<>();
+        feOffset.put("splitId", "public.orders:0");
+        feOffset.put("tableId", "public.orders");
+        feOffset.put("splitKey", Arrays.asList("id"));
+        feOffset.put("splitStart", Arrays.asList(100));
+        feOffset.put("splitEnd", Arrays.asList(200));
+
+        SnapshotSplit deserialized = MAPPER.convertValue(feOffset, 
SnapshotSplit.class);
+
+        Object[] restoredStart =
+                AbstractCdcSourceReader.convertBounds(
+                        deserialized.getSplitStart(), Long.class, MAPPER);
+        Object[] restoredEnd =
+                AbstractCdcSourceReader.convertBounds(
+                        deserialized.getSplitEnd(), Long.class, MAPPER);
+        assertEquals(Long.class, restoredStart[0].getClass());
+        assertEquals(100L, restoredStart[0]);
+        assertEquals(200L, restoredEnd[0]);
+    }
+}
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.out
new file mode 100644
index 00000000000..d6105d7a899
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_date_pk --
+2025-01-01     A1
+2025-01-02     B1
+2025-01-03     C1
+2025-01-04     D1
+2025-01-05     E1
+
+-- !select_snapshot_composite_pk --
+2025-02-01     1       A2
+2025-02-02     2       B2
+2025-02-03     3       C2
+2025-02-04     4       D2
+2025-02-05     5       E2
+
+-- !select_binlog_date_pk --
+2025-01-02     B1_upd
+2025-01-03     C1
+2025-01-04     D1
+2025-01-05     E1
+2025-01-06     F1
+
+-- !select_binlog_composite_pk --
+2025-02-02     2       B2_upd
+2025-02-03     3       C2
+2025-02-04     4       D2
+2025-02-05     5       E2
+2025-02-06     6       F2
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.out
new file mode 100644
index 00000000000..d6105d7a899
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_date_pk --
+2025-01-01     A1
+2025-01-02     B1
+2025-01-03     C1
+2025-01-04     D1
+2025-01-05     E1
+
+-- !select_snapshot_composite_pk --
+2025-02-01     1       A2
+2025-02-02     2       B2
+2025-02-03     3       C2
+2025-02-04     4       D2
+2025-02-05     5       E2
+
+-- !select_binlog_date_pk --
+2025-01-02     B1_upd
+2025-01-03     C1
+2025-01-04     D1
+2025-01-05     E1
+2025-01-06     F1
+
+-- !select_binlog_composite_pk --
+2025-02-02     2       B2_upd
+2025-02-03     3       C2
+2025-02-04     4       D2
+2025-02-05     5       E2
+2025-02-06     6       F2
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.groovy
new file mode 100644
index 00000000000..85570960de5
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_date_pk", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_date_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def tableDate = "events_mysql_date_pk"
+    def tableComposite = "events_mysql_date_id_pk"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${tableDate} force"""
+    sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableDate}"""
+            sql """CREATE TABLE ${mysqlDb}.${tableDate} (
+                  `event_date` date NOT NULL,
+                  `payload` varchar(200) DEFAULT NULL,
+                  PRIMARY KEY (`event_date`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) 
VALUES ('2025-01-01', 'A1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) 
VALUES ('2025-01-02', 'B1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) 
VALUES ('2025-01-03', 'C1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) 
VALUES ('2025-01-04', 'D1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) 
VALUES ('2025-01-05', 'E1')"""
+
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableComposite}"""
+            sql """CREATE TABLE ${mysqlDb}.${tableComposite} (
+                  `event_date` date NOT NULL,
+                  `id` int NOT NULL,
+                  `payload` varchar(200) DEFAULT NULL,
+                  PRIMARY KEY (`event_date`, `id`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, 
payload) VALUES ('2025-02-01', 1, 'A2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, 
payload) VALUES ('2025-02-02', 2, 'B2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, 
payload) VALUES ('2025-02-03', 3, 'C2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, 
payload) VALUES ('2025-02-04', 4, 'D2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, 
payload) VALUES ('2025-02-05', 5, 'E2')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysqlDb}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${tableDate},${tableComposite}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "1",
+                    "snapshot_parallelism" = "2"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                        log.info("jobSuccendCount: " + jobSuccendCount)
+                        jobSuccendCount.size() == 1 && '5' <= 
jobSuccendCount.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_select_snapshot_date_pk """ SELECT * FROM ${tableDate} order by 
event_date asc """
+        qt_select_snapshot_composite_pk """ SELECT * FROM ${tableComposite} 
order by event_date asc, id asc """
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) 
VALUES ('2025-01-06', 'F1')"""
+            sql """UPDATE ${mysqlDb}.${tableDate} SET payload = 'B1_upd' WHERE 
event_date = '2025-01-02'"""
+            sql """DELETE FROM ${mysqlDb}.${tableDate} WHERE event_date = 
'2025-01-01'"""
+
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, 
payload) VALUES ('2025-02-06', 6, 'F2')"""
+            sql """UPDATE ${mysqlDb}.${tableComposite} SET payload = 'B2_upd' 
WHERE event_date = '2025-02-02' AND id = 2"""
+            sql """DELETE FROM ${mysqlDb}.${tableComposite} WHERE event_date = 
'2025-02-01' AND id = 1"""
+        }
+
+        sleep(60000)
+
+        qt_select_binlog_date_pk """ SELECT * FROM ${tableDate} order by 
event_date asc """
+        qt_select_binlog_composite_pk """ SELECT * FROM ${tableComposite} 
order by event_date asc, id asc """
+
+        def jobInfo = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+        assert jobInfo.get(0).get(0) == "RUNNING"
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy
new file mode 100644
index 00000000000..ec43d22f988
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_date_pk", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_date_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def tableDate = "events_pg_date_pk"
+    def tableComposite = "events_pg_date_id_pk"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${tableDate} force"""
+    sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableDate}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableDate} (
+                  "event_date" date PRIMARY KEY,
+                  "payload" varchar(200)
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-01', 'A1');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-02', 'B1');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-03', 'C1');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-04', 'D1');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-05', 'E1');"""
+
+            sql """DROP TABLE IF EXISTS 
${pgDB}.${pgSchema}.${tableComposite}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableComposite} (
+                  "event_date" date NOT NULL,
+                  "id" int4 NOT NULL,
+                  "payload" varchar(200),
+                  PRIMARY KEY ("event_date", "id")
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-01', 1, 'A2');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-02', 2, 'B2');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-03', 3, 'C2');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-04', 4, 'D2');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-05', 5, 'E2');"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${tableDate},${tableComposite}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "1",
+                    "snapshot_parallelism" = "2"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                        log.info("jobSuccendCount: " + jobSuccendCount)
+                        jobSuccendCount.size() == 1 && '5' <= 
jobSuccendCount.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_select_snapshot_date_pk """ SELECT * FROM ${tableDate} order by 
event_date asc """
+        qt_select_snapshot_composite_pk """ SELECT * FROM ${tableComposite} 
order by event_date asc, id asc """
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-06', 'F1');"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${tableDate} SET payload = 
'B1_upd' WHERE event_date = '2025-01-02';"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${tableDate} WHERE 
event_date = '2025-01-01';"""
+
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-06', 6, 'F2');"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${tableComposite} SET payload = 
'B2_upd' WHERE event_date = '2025-02-02' AND id = 2;"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${tableComposite} WHERE 
event_date = '2025-02-01' AND id = 1;"""
+        }
+
+        sleep(60000)
+
+        qt_select_binlog_date_pk """ SELECT * FROM ${tableDate} order by 
event_date asc """
+        qt_select_binlog_composite_pk """ SELECT * FROM ${tableComposite} 
order by event_date asc, id asc """
+
+        def jobInfo = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+        assert jobInfo.get(0).get(0) == "RUNNING"
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to