This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e2c834003fc [regression-test](streaming-job) add cdc cases for 
source/jdbc timezone and TIMESTAMP/timestamptz pk (#63543)
e2c834003fc is described below

commit e2c834003fcf5bb6f863639e0792a80adb65df02
Author: wudi <[email protected]>
AuthorDate: Fri Jun 5 11:12:04 2026 +0800

    [regression-test](streaming-job) add cdc cases for source/jdbc timezone and 
TIMESTAMP/timestamptz pk (#63543)
    
    ### What problem does this PR solve?
    
    Add CDC streaming-job regression coverage for timezone behavior and
    TIMESTAMP / timestamptz chunk-key paths that the existing suites do not
    exercise.
    
    #### Cases added
    
    | Case | Tables | Guards |
    |---|---|---|
    | `*_source_timezone` (mysql + pg) | TIMESTAMP/timestamptz
    multi-precision + DATETIME/timestamp + DATE/date (pg: + `timetz`) |
    source-side multi-tz INSERT (+08 / -05 / UTC), NULL, UPDATE crossing tz,
    epoch boundary; pg `timetz` column kept as a regression guard for the
    upstream JVM-tz handling |
    | `*_jdbc_servertimezone` (mysql + pg) | TIMESTAMP/timestamptz +
    DATETIME/timestamp | recommended end-to-end config — align `jdbc_url`'s
    `serverTimezone`/`timezone` with Doris session `time_zone` (read at
    runtime so it works on any default tz) |
    | `*_timestamp_pk` (mysql + pg) | mysql TIMESTAMP(6) + composite
    (TIMESTAMP, id); pg timestamp(6) + timestamptz(6) + composite
    (timestamptz, id) | chunk-key split + INSERT/UPDATE/DELETE locating on
    TIMESTAMP/timestamptz PK; depends on #63471 for `LocalDateTime` /
    `OffsetDateTime` chunk-bound restore in
    `AbstractCdcSourceReader.convertBound` |
---
 .../deserialize/DebeziumJsonDeserializer.java      |  15 ++
 .../source/reader/AbstractCdcSourceReader.java     |  23 ++-
 .../deserialize/DebeziumJsonDeserializerTest.java  |  50 +++++
 ...est_streaming_mysql_job_jdbc_servertimezone.out |  14 ++
 .../test_streaming_mysql_job_source_timezone.out   |  29 +++
 .../cdc/test_streaming_mysql_job_timestamp_pk.out  |  28 +++
 ..._streaming_postgres_job_jdbc_servertimezone.out |  15 ++
 ...test_streaming_postgres_job_source_timezone.out |  29 +++
 .../test_streaming_postgres_job_timestamp_pk.out   |  42 ++++
 ..._streaming_mysql_job_jdbc_servertimezone.groovy | 144 +++++++++++++
 ...test_streaming_mysql_job_source_timezone.groovy | 218 ++++++++++++++++++++
 .../test_streaming_mysql_job_timestamp_pk.groovy   | 167 +++++++++++++++
 ...reaming_postgres_job_jdbc_servertimezone.groovy | 154 ++++++++++++++
 ...t_streaming_postgres_job_source_timezone.groovy | 226 +++++++++++++++++++++
 ...test_streaming_postgres_job_timestamp_pk.groovy | 193 ++++++++++++++++++
 15 files changed, 1346 insertions(+), 1 deletion(-)

diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 6881dd70d40..8487975f7d1 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -69,6 +69,7 @@ import io.debezium.time.NanoTime;
 import io.debezium.time.NanoTimestamp;
 import io.debezium.time.Time;
 import io.debezium.time.Timestamp;
+import io.debezium.time.ZonedTime;
 import io.debezium.time.ZonedTimestamp;
 import lombok.Getter;
 import lombok.Setter;
@@ -255,6 +256,8 @@ public class DebeziumJsonDeserializer
                     return convertTimestamp(name, dbzObj);
                 case ZonedTimestamp.SCHEMA_NAME:
                     return convertZoneTimestamp(dbzObj);
+                case ZonedTime.SCHEMA_NAME:
+                    return convertZoneTime(dbzObj);
                 case Decimal.LOGICAL_NAME:
                     return convertDecimal(dbzObj, fieldSchema);
                 case Bits.LOGICAL_NAME:
@@ -317,6 +320,18 @@ public class DebeziumJsonDeserializer
         return dbzObj.toString();
     }
 
+    private Object convertZoneTime(Object dbzObj) {
+        // timetz has no date, so a named zone's DST offset cannot be 
resolved. Following
+        // Debezium/PostgreSQL semantics, keep Debezium's UTC-normalized 
ZonedTime string as-is
+        // (offset preserved) rather than shifting it into serverTimeZone, 
which would drop the
+        // offset and mishandle DST.
+        if (dbzObj instanceof String) {
+            return dbzObj;
+        }
+        LOG.warn("Unable to convert to zone time, default {}", dbzObj);
+        return dbzObj.toString();
+    }
+
     private Object convertTimestamp(String typeName, Object dbzObj) {
         if (dbzObj instanceof Long) {
             switch (typeName) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
index 6c46482dc6d..74eb534bc5f 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -100,13 +100,34 @@ public abstract class AbstractCdcSourceReader implements 
SourceReader {
     private static final Map<Class<?>, Function<String, Object>> BOUND_PARSERS 
=
             Map.of(
                     java.sql.Date.class, java.sql.Date::valueOf,
-                    java.sql.Timestamp.class, java.sql.Timestamp::valueOf,
+                    java.sql.Timestamp.class, 
AbstractCdcSourceReader::parseTimestampBound,
                     java.sql.Time.class, java.sql.Time::valueOf,
                     java.time.LocalDateTime.class, 
java.time.LocalDateTime::parse,
                     java.time.LocalDate.class, java.time.LocalDate::parse,
                     java.time.LocalTime.class, java.time.LocalTime::parse,
                     java.time.OffsetDateTime.class, 
java.time.OffsetDateTime::parse);
 
+    // Offset suffix like "+08:00" / "-05:00" ("Z" is handled separately).
+    private static final java.util.regex.Pattern OFFSET_SUFFIX =
+            java.util.regex.Pattern.compile("[+-]\\d{2}:\\d{2}$");
+
+    /**
+     * A java.sql.Timestamp bound is serialized to FE by Jackson as ISO-8601 
(with a zone offset
+     * when WRITE_DATES_AS_TIMESTAMPS is off), which 
java.sql.Timestamp#valueOf cannot parse.
+     * Reconstruct instant-faithfully so the rebuilt bound binds back to the 
same value flink-cdc
+     * read.
+     */
+    private static java.sql.Timestamp parseTimestampBound(String s) {
+        try {
+            return java.sql.Timestamp.valueOf(s); // legacy SQL form 
"yyyy-MM-dd HH:mm:ss[.f]"
+        } catch (IllegalArgumentException notSqlForm) {
+            if (s.endsWith("Z") || OFFSET_SUFFIX.matcher(s).find()) {
+                return 
java.sql.Timestamp.from(java.time.OffsetDateTime.parse(s).toInstant());
+            }
+            return 
java.sql.Timestamp.valueOf(java.time.LocalDateTime.parse(s)); // ISO local ('T')
+        }
+    }
+
     private static Object convertBound(Object v, Class<?> target, ObjectMapper 
mapper) {
         if (v == null) {
             return null;
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
index ac4d7b58226..75b2ffbfe9e 100644
--- 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.cdcclient.source.deserialize;
 import org.junit.jupiter.api.Test;
 
 import java.lang.reflect.Method;
+import java.time.ZoneId;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -73,4 +74,53 @@ class DebeziumJsonDeserializerTest {
             throw new RuntimeException(e);
         }
     }
+
+    // ─── convertZoneTime 
──────────────────────────────────────────────────────
+    // timetz arrives as a UTC-normalized ISO string (Debezium ZonedTime). cdc 
keeps it
+    // verbatim with the offset preserved, independent of serverTimeZone, 
since a
+    // date-less time cannot resolve a named zone's DST. Mirrors 
Debezium/PostgreSQL.
+
+    @Test
+    void zoneTime_utc_preservesOffset() {
+        deserializer.setServerTimeZone(ZoneId.of("UTC"));
+        assertEquals("12:00:00.123456Z", 
invokeConvertZoneTime("12:00:00.123456Z"));
+    }
+
+    @Test
+    void zoneTime_plus08_serverTimeZoneIgnored() {
+        deserializer.setServerTimeZone(ZoneId.of("+08:00"));
+        // serverTimeZone must not shift timetz; the offset-bearing string is 
kept as-is
+        assertEquals("12:00:00.123456Z", 
invokeConvertZoneTime("12:00:00.123456Z"));
+    }
+
+    @Test
+    void zoneTime_minus05_serverTimeZoneIgnored() {
+        deserializer.setServerTimeZone(ZoneId.of("-05:00"));
+        assertEquals("01:00:00.123456Z", 
invokeConvertZoneTime("01:00:00.123456Z"));
+    }
+
+    @Test
+    void zoneTime_dstZone_notShifted() {
+        // a DST zone's offset is date-dependent; timetz has no date, so it 
must not be
+        // shifted -- the input is returned unchanged regardless of New York 
DST.
+        deserializer.setServerTimeZone(ZoneId.of("America/New_York"));
+        assertEquals("12:00:00.123456Z", 
invokeConvertZoneTime("12:00:00.123456Z"));
+    }
+
+    @Test
+    void zoneTime_wholeSecond_keepsSeconds() {
+        deserializer.setServerTimeZone(ZoneId.of("UTC"));
+        assertEquals("00:00:00Z", invokeConvertZoneTime("00:00:00Z"));
+    }
+
+    private Object invokeConvertZoneTime(Object dbzObj) {
+        try {
+            Method m =
+                    
DebeziumJsonDeserializer.class.getDeclaredMethod("convertZoneTime", 
Object.class);
+            m.setAccessible(true);
+            return m.invoke(deserializer, dbzObj);
+        } catch (ReflectiveOperationException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.out
new file mode 100644
index 00000000000..7e95f55810a
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.out
@@ -0,0 +1,14 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !desc --
+id     int     No      true    \N      
+tag    varchar(96)     Yes     false   \N      NONE
+ts0    datetime        Yes     false   \N      NONE
+dt0    datetime        Yes     false   \N      NONE
+
+-- !select_snapshot --
+1      snapshot_tokyo  2024-06-15T11:00        2024-06-15T11:00
+
+-- !select_binlog --
+1      snapshot_tokyo  2024-06-15T11:00        2024-06-15T11:00
+2      binlog_tokyo    2024-06-15T11:00        2024-06-15T11:00
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.out
new file mode 100644
index 00000000000..dbed0d086d1
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !desc --
+id     int     No      true    \N      
+tag    varchar(96)     Yes     false   \N      NONE
+ts0    datetime        Yes     false   \N      NONE
+ts3    datetime(3)     Yes     false   \N      NONE
+ts6    datetime(6)     Yes     false   \N      NONE
+dt0    datetime        Yes     false   \N      NONE
+dt6    datetime(6)     Yes     false   \N      NONE
+d      date    Yes     false   \N      NONE
+
+-- !select_snapshot --
+1      snapshot_plus08 2024-06-15T12:00        2024-06-15T12:00:00.123 
2024-06-15T12:00:00.123456      2024-06-15T20:00        
2024-06-15T20:00:00.123456      2024-06-15
+2      snapshot_minus05        2024-06-16T01:00        2024-06-16T01:00:00.123 
2024-06-16T01:00:00.123456      2024-06-15T20:00        
2024-06-15T20:00:00.123456      2024-06-15
+3      snapshot_utc    2024-06-15T20:00        2024-06-15T20:00:00.123 
2024-06-15T20:00:00.123456      2024-06-15T20:00        
2024-06-15T20:00:00.123456      2024-06-15
+4      snapshot_null   \N      \N      \N      \N      \N      \N
+5      snapshot_epoch_plus08   1970-01-01T00:00:01     1970-01-01T00:00:01.123 
1970-01-01T00:00:01.123456      1970-01-01T08:00:01     
1970-01-01T08:00:01.123456      1970-01-01
+
+-- !select_binlog --
+1      snapshot_plus08 2024-06-15T14:00        2024-06-15T12:00:00.123 
2024-06-15T12:00:00.123456      2024-06-15T20:00        
2024-06-15T20:00:00.123456      2024-06-15
+2      snapshot_minus05        2024-06-16T01:00        2024-06-16T01:00:00.123 
2024-06-16T01:00:00.123456      2024-06-15T20:00        
2024-06-15T20:00:00.123456      2024-06-15
+3      snapshot_utc    2024-06-15T20:00        2024-06-15T20:00:00.123 
2024-06-15T20:00:00.123456      2024-06-15T20:00        
2024-06-15T20:00:00.123456      2024-06-15
+4      snapshot_null   \N      \N      \N      \N      \N      \N
+5      snapshot_epoch_plus08   1970-01-01T00:00:01     1970-01-01T00:00:01.123 
1970-01-01T00:00:01.123456      1970-01-01T08:00:01     
1970-01-01T08:00:01.123456      1970-01-01
+101    binlog_plus08   2024-06-15T12:00        2024-06-15T12:00:00.123 
2024-06-15T12:00:00.123456      2024-06-15T20:00        
2024-06-15T20:00:00.123456      2024-06-15
+102    binlog_minus05  2024-06-16T01:00        2024-06-16T01:00:00.123 
2024-06-16T01:00:00.123456      2024-06-15T20:00        
2024-06-15T20:00:00.123456      2024-06-15
+103    binlog_utc      2024-06-15T20:00        2024-06-15T20:00:00.123 
2024-06-15T20:00:00.123456      2024-06-15T20:00        
2024-06-15T20:00:00.123456      2024-06-15
+105    binlog_epoch_plus08     1970-01-01T00:00:01     1970-01-01T00:00:01.123 
1970-01-01T00:00:01.123456      1970-01-01T08:00:01     
1970-01-01T08:00:01.123456      1970-01-01
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.out
new file mode 100644
index 00000000000..2691ac4ac12
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.out
@@ -0,0 +1,28 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_timestamp_pk --
+2024-01-01T00:00       A1
+2024-06-15T12:00:00.123456     B1
+2025-01-01T00:00       C1
+2025-06-15T12:34:56.999999     D1
+2026-01-01T00:00       E1
+
+-- !select_snapshot_composite_pk --
+2024-02-01T00:00       1       A2
+2024-02-01T00:00       2       B2
+2024-02-02T12:00:00.500        3       C2
+2024-02-03T23:59:59.999999     4       D2
+2024-02-04T00:00       5       E2
+
+-- !select_after_incr_timestamp_pk --
+2024-01-01T00:00       A1
+2024-06-15T12:00:00.123456     B2_upd
+2025-01-01T00:00       C1
+2026-01-01T00:00       E1
+2026-06-01T00:00       F2
+
+-- !select_after_incr_composite_pk --
+2024-02-01T00:00       1       A2
+2024-02-01T00:00       2       B2
+2024-02-02T12:00:00.500        3       C3_upd
+2024-02-04T00:00       5       E2
+2024-02-05T00:00       6       F3
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.out
new file mode 100644
index 00000000000..ba312fbce72
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !desc --
+id     int     No      true    \N      
+tag    text    Yes     false   \N      NONE
+ts     datetime(6)     Yes     false   \N      NONE
+tstz   datetime(6)     Yes     false   \N      NONE
+ttz    text    Yes     false   \N      NONE
+
+-- !select_snapshot --
+1      snapshot_tokyo  2024-06-15T11:00        2024-06-15T11:00        
02:00:00Z
+
+-- !select_binlog --
+1      snapshot_tokyo  2024-06-15T11:00        2024-06-15T11:00        
02:00:00Z
+2      binlog_tokyo    2024-06-15T11:00        2024-06-15T11:00        
02:00:00Z
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.out
new file mode 100644
index 00000000000..a7c3fd1274c
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !desc --
+id     int     No      true    \N      
+tag    text    Yes     false   \N      NONE
+ts     datetime(6)     Yes     false   \N      NONE
+tstz0  datetime        Yes     false   \N      NONE
+tstz3  datetime(3)     Yes     false   \N      NONE
+tstz6  datetime(6)     Yes     false   \N      NONE
+ttz    text    Yes     false   \N      NONE
+d      date    Yes     false   \N      NONE
+
+-- !select_snapshot --
+1      snapshot_plus08 2024-06-15T20:00        2024-06-15T12:00        
2024-06-15T12:00:00.123 2024-06-15T12:00:00.123456      12:00:00.123456Z        
2024-06-15
+2      snapshot_minus05        2024-06-15T20:00        2024-06-16T01:00        
2024-06-16T01:00:00.123 2024-06-16T01:00:00.123456      01:00:00.123456Z        
2024-06-15
+3      snapshot_utc    2024-06-15T20:00        2024-06-15T20:00        
2024-06-15T20:00:00.123 2024-06-15T20:00:00.123456      20:00:00.123456Z        
2024-06-15
+4      snapshot_null   \N      \N      \N      \N      \N      \N
+5      snapshot_epoch_plus08   1970-01-01T08:00:01     1970-01-01T00:00:01     
1970-01-01T00:00:01.123 1970-01-01T00:00:01.123456      00:00:01.123456Z        
1970-01-01
+
+-- !select_binlog --
+1      snapshot_plus08 2024-06-15T20:00        2024-06-15T14:00        
2024-06-15T12:00:00.123 2024-06-15T12:00:00.123456      12:00:00.123456Z        
2024-06-15
+2      snapshot_minus05        2024-06-15T20:00        2024-06-16T01:00        
2024-06-16T01:00:00.123 2024-06-16T01:00:00.123456      01:00:00.123456Z        
2024-06-15
+3      snapshot_utc    2024-06-15T20:00        2024-06-15T20:00        
2024-06-15T20:00:00.123 2024-06-15T20:00:00.123456      20:00:00.123456Z        
2024-06-15
+4      snapshot_null   \N      \N      \N      \N      \N      \N
+5      snapshot_epoch_plus08   1970-01-01T08:00:01     1970-01-01T00:00:01     
1970-01-01T00:00:01.123 1970-01-01T00:00:01.123456      00:00:01.123456Z        
1970-01-01
+101    binlog_plus08   2024-06-15T20:00        2024-06-15T12:00        
2024-06-15T12:00:00.123 2024-06-15T12:00:00.123456      12:00:00.123456Z        
2024-06-15
+102    binlog_minus05  2024-06-15T20:00        2024-06-16T01:00        
2024-06-16T01:00:00.123 2024-06-16T01:00:00.123456      01:00:00.123456Z        
2024-06-15
+103    binlog_utc      2024-06-15T20:00        2024-06-15T20:00        
2024-06-15T20:00:00.123 2024-06-15T20:00:00.123456      20:00:00.123456Z        
2024-06-15
+105    binlog_epoch_plus08     1970-01-01T08:00:01     1970-01-01T00:00:01     
1970-01-01T00:00:01.123 1970-01-01T00:00:01.123456      00:00:01.123456Z        
1970-01-01
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.out
new file mode 100644
index 00000000000..a8ef3d6d9af
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.out
@@ -0,0 +1,42 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_timestamp_pk --
+2024-01-01T00:00       A1
+2024-06-15T12:00:00.123456     B1
+2025-01-01T00:00       C1
+2025-06-15T12:34:56.999999     D1
+2026-01-01T00:00       E1
+
+-- !select_snapshot_timestamptz_pk --
+2024-01-01T00:00       A1
+2024-06-15T12:00:00.123456     B1
+2025-01-01T00:00       C1
+2025-06-15T12:34:56.999999     D1
+2026-01-01T00:00       E1
+
+-- !select_snapshot_composite_pk --
+2024-02-01T00:00       1       A2
+2024-02-01T00:00       2       B2
+2024-02-02T12:00:00.500        3       C2
+2024-02-03T23:59:59.999999     4       D2
+2024-02-04T00:00       5       E2
+
+-- !select_after_incr_timestamp_pk --
+2024-01-01T00:00       A1
+2024-06-15T12:00:00.123456     B2_upd
+2025-01-01T00:00       C1
+2026-01-01T00:00       E1
+2026-06-01T00:00       F2
+
+-- !select_after_incr_timestamptz_pk --
+2024-01-01T00:00       A1
+2024-06-15T12:00:00.123456     B2_upd
+2025-01-01T00:00       C1
+2026-01-01T00:00       E1
+2026-06-01T00:00       F2
+
+-- !select_after_incr_composite_pk --
+2024-02-01T00:00       1       A2
+2024-02-01T00:00       2       B2
+2024-02-02T12:00:00.500        3       C3_upd
+2024-02-04T00:00       5       E2
+2024-02-05T00:00       6       F3
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.groovy
new file mode 100644
index 00000000000..5f108efff78
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.groovy
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Recommended end-to-end tz configuration for data fidelity: set jdbc_url's
+// serverTimezone to the SOURCE MySQL session/server tz, so cdc renders the
+// TIMESTAMP instant back to the exact wall clock the source shows. Doris data
+// then stays identical to MySQL, independent of Doris's own session tz.
+//
+// Source tz is +09 (Tokyo), deliberately != Doris default +08, so the case
+// proves the rendering follows the SOURCE tz (not Doris). Both the source
+// session and jdbc_url use the fixed offset '+09:00' (not a named zone): 
forcing
+// the connection session to an offset needs no MySQL tz tables, and with no 
DST
+// the offset is constant so the result is fully deterministic.
+//
+// Setup (source tz = +09):
+//   source SET SESSION time_zone='+09:00', INSERT '2024-06-15 11:00:00'
+//     ts0 (TIMESTAMP) -> source-internal UTC instant 2024-06-15 02:00:00Z
+//     dt0 (DATETIME)  -> literal '2024-06-15 11:00:00'
+//   jdbc_url serverTimezone aligned to the SOURCE tz (+09:00)
+//
+// Expectation at Doris (independent of Doris session tz, since cdc renders
+// with the source tz, not Doris's; .out has no dependency on Doris tz):
+//   ts0 -> '2024-06-15T11:00'  (02:00Z rendered with +09:00 = 11:00, == 
source)
+//   dt0 -> '2024-06-15T11:00'  (DATETIME has no tz semantics, stored verbatim)
+suite("test_streaming_mysql_job_jdbc_servertimezone", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_jdbc_servertimezone_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_mysql_jdbc_servertimezone"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // jdbc serverTimezone is aligned to the SOURCE db tz (not Doris) so
+        // Doris data matches the source wall clock. Log Doris tz only to show
+        // the result is independent of it.
+        def sourceTz = "+09:00"
+        // %2B is a URL-encoded '+': a literal '+' in a jdbc URL decodes to a 
space
+        def jdbcTz = "%2B09:00"
+        log.info("Doris session time_zone = ${(sql "select 
@@time_zone")[0][0]}; cdc renders with source tz ${jdbcTz}.")
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """
+            create table ${mysqlDb}.${table1} (
+                id int primary key,
+                tag varchar(32),
+                ts0 timestamp null,
+                dt0 datetime null
+            ) engine=innodb charset=utf8;
+            """
+
+            sql """SET SESSION time_zone = '${sourceTz}'"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1, 
'snapshot_tokyo',
+                '2024-06-15 11:00:00', '2024-06-15 11:00:00')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=${jdbcTz}&forceConnectionTimeZoneToSession=true",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        log.info("snapshot row count: " + cnt)
+                        cnt.get(0).get(0) == 1
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_desc """desc ${currentDb}.${table1};"""
+        qt_select_snapshot """select * from ${currentDb}.${table1} order by 
id;"""
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """SET SESSION time_zone = '${sourceTz}'"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (2, 'binlog_tokyo',
+                '2024-06-15 11:00:00', '2024-06-15 11:00:00')"""
+        }
+
+        Awaitility.await().atMost(180, SECONDS)
+                .pollInterval(2, SECONDS).until(
+                {
+                    def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                    cnt.get(0).get(0) == 2
+                }
+        )
+
+        qt_select_binlog """select * from ${currentDb}.${table1} order by 
id;"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name = '${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.groovy
new file mode 100644
index 00000000000..27e62a487c2
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.groovy
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Verify cdc tz handling when source session tz differs from cdc-client tz.
+//
+// MySQL TIMESTAMP stores a UTC instant; the session tz only affects how the
+// wall clock is parsed on write and rendered on read. DATETIME / DATE are
+// wall-clock verbatim, no tz transform.
+//
+// Coverage in one table:
+//   * source session tz set to +08 / -05 / +00, same wall clock written ->
+//     different UTC instants prove cdc honors source session tz (not a
+//     hardcoded offset, not the cdc-client JVM tz).
+//   * NULL row across every temporal column.
+//   * MySQL TIMESTAMP epoch lower bound ('1970-01-01 00:00:01Z').
+//   * Binlog path mirrors snapshot themes, plus an UPDATE that rewrites a
+//     TIMESTAMP column under +08.
+//
+// jdbc_url uses serverTimezone=UTC so cdc renders TIMESTAMP back to UTC
+// wall clock regardless of the source session offset.
+suite("test_streaming_mysql_job_source_timezone", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_source_timezone_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_mysql_source_timezone"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """
+            create table ${mysqlDb}.${table1} (
+                id int primary key,
+                tag varchar(32),
+                ts0 timestamp null,
+                ts3 timestamp(3) null,
+                ts6 timestamp(6) null,
+                dt0 datetime null,
+                dt6 datetime(6) null,
+                d date null
+            ) engine=innodb charset=utf8;
+            """
+
+            // id=1: +08 baseline
+            sql """SET SESSION time_zone = '+08:00'"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1, 
'snapshot_plus08',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123',
+                '2024-06-15 20:00:00.123456',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123456',
+                '2024-06-15')"""
+
+            // id=2: -05 -> same wall clock crosses to next UTC day (20:00 -05 
= 01:00 next day UTC)
+            sql """SET SESSION time_zone = '-05:00'"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (2, 
'snapshot_minus05',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123',
+                '2024-06-15 20:00:00.123456',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123456',
+                '2024-06-15')"""
+
+            // id=3: +00 -> wall clock == UTC instant
+            sql """SET SESSION time_zone = '+00:00'"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (3, 'snapshot_utc',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123',
+                '2024-06-15 20:00:00.123456',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123456',
+                '2024-06-15')"""
+
+            // id=4: NULL across every nullable column
+            sql """INSERT INTO ${mysqlDb}.${table1} (id, tag) VALUES (4, 
'snapshot_null')"""
+
+            // id=5: epoch lower bound. MySQL TIMESTAMP min is '1970-01-01 
00:00:01' UTC,
+            // so under +08 the smallest legal wall clock is '1970-01-01 
08:00:01'.
+            sql """SET SESSION time_zone = '+08:00'"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (5, 
'snapshot_epoch_plus08',
+                '1970-01-01 08:00:01',
+                '1970-01-01 08:00:01.123',
+                '1970-01-01 08:00:01.123456',
+                '1970-01-01 08:00:01',
+                '1970-01-01 08:00:01.123456',
+                '1970-01-01')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        log.info("snapshot row count: " + cnt)
+                        cnt.get(0).get(0) == 5
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_desc """desc ${currentDb}.${table1};"""
+        qt_select_snapshot """select * from ${currentDb}.${table1} order by 
id;"""
+
+        // Binlog phase: same tz themes through binlog, plus an UPDATE that
+        // rewrites ts0 on id=1 under +08 to confirm UPDATE follows the same
+        // tz codepath as INSERT.
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """SET SESSION time_zone = '+08:00'"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (101, 
'binlog_plus08',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123',
+                '2024-06-15 20:00:00.123456',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123456',
+                '2024-06-15')"""
+
+            sql """SET SESSION time_zone = '-05:00'"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (102, 
'binlog_minus05',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123',
+                '2024-06-15 20:00:00.123456',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123456',
+                '2024-06-15')"""
+
+            sql """SET SESSION time_zone = '+00:00'"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (103, 'binlog_utc',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123',
+                '2024-06-15 20:00:00.123456',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123456',
+                '2024-06-15')"""
+
+            sql """SET SESSION time_zone = '+08:00'"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES (105, 
'binlog_epoch_plus08',
+                '1970-01-01 08:00:01',
+                '1970-01-01 08:00:01.123',
+                '1970-01-01 08:00:01.123456',
+                '1970-01-01 08:00:01',
+                '1970-01-01 08:00:01.123456',
+                '1970-01-01')"""
+
+            // UPDATE: id=1 was '20:00 +08' (UTC 12:00). Push wall clock to 
'22:00 +08'
+            // (UTC 14:00) so we can poll for completion.
+            sql """UPDATE ${mysqlDb}.${table1} SET ts0 = '2024-06-15 22:00:00' 
WHERE id = 1"""
+        }
+
+        // Wait for 4 binlog INSERTs + UPDATE on id=1 to settle.
+        Awaitility.await().atMost(180, SECONDS)
+                .pollInterval(2, SECONDS).until(
+                {
+                    def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                    if (cnt.get(0).get(0) != 9) return false
+                    def updated = sql """select ts0 from 
${currentDb}.${table1} where id = 1"""
+                    return 
updated.get(0).get(0).toString().startsWith('2024-06-15T14:00')
+                }
+        )
+
+        qt_select_binlog """select * from ${currentDb}.${table1} order by 
id;"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name = '${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.groovy
new file mode 100644
index 00000000000..514ff72292d
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.groovy
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Cover TIMESTAMP chunk-key path. TIMESTAMP is the epoch+tz column type,
+// driver returns java.time.LocalDateTime by default; chunk-bound JSON
+// round-trip exercises the LocalDateTime branch in
+// AbstractCdcSourceReader.convertBound.
+//
+// Both source session and jdbc_url are pinned to UTC so the TIMESTAMP wall
+// clock written by INSERT equals the value cdc renders, and the .out can
+// be filled deterministically.
+suite("test_streaming_mysql_job_timestamp_pk", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_timestamp_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def tableTs = "events_mysql_timestamp_pk"
+    def tableComposite = "events_mysql_timestamp_id_pk"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${tableTs} force"""
+    sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """SET SESSION time_zone = '+00:00'"""
+
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableTs}"""
+            // Explicit DEFAULT suppresses MySQL 5.7 implicit ON UPDATE 
CURRENT_TIMESTAMP on the
+            // first TIMESTAMP column, so UPDATE does not auto-rewrite the 
event_ts primary key.
+            sql """CREATE TABLE ${mysqlDb}.${tableTs} (
+                  `event_ts` timestamp(6) NOT NULL DEFAULT '1970-01-01 
00:00:01',
+                  `payload` varchar(64),
+                  PRIMARY KEY (`event_ts`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2024-01-01 
00:00:00.000000', 'A1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2024-06-15 
12:00:00.123456', 'B1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2025-01-01 
00:00:00.000000', 'C1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2025-06-15 
12:34:56.999999', 'D1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2026-01-01 
00:00:00.000000', 'E1')"""
+
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableComposite}"""
+            sql """CREATE TABLE ${mysqlDb}.${tableComposite} (
+                  `event_ts` timestamp(6) NOT NULL DEFAULT '1970-01-01 
00:00:01',
+                  `id` int NOT NULL,
+                  `payload` varchar(64),
+                  PRIMARY KEY (`event_ts`, `id`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES 
('2024-02-01 00:00:00.000000', 1, 'A2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES 
('2024-02-01 00:00:00.000000', 2, 'B2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES 
('2024-02-02 12:00:00.500000', 3, 'C2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES 
('2024-02-03 23:59:59.999999', 4, 'D2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES 
('2024-02-04 00:00:00.000000', 5, 'E2')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${tableTs},${tableComposite}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "2"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def c1 = sql """select count(1) from 
${currentDb}.${tableTs}"""
+                        def c2 = sql """select count(1) from 
${currentDb}.${tableComposite}"""
+                        log.info("snapshot row count ts=${c1} composite=${c2}")
+                        c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_select_snapshot_timestamp_pk """select event_ts, payload from 
${currentDb}.${tableTs} order by event_ts asc"""
+        qt_select_snapshot_composite_pk """select event_ts, id, payload from 
${currentDb}.${tableComposite} order by event_ts asc, id asc"""
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC") {
+            sql """SET SESSION time_zone = '+00:00'"""
+
+            sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2026-06-01 
00:00:00.000000', 'F2')"""
+            sql """UPDATE ${mysqlDb}.${tableTs} SET payload='B2_upd' WHERE 
event_ts='2024-06-15 12:00:00.123456'"""
+            sql """DELETE FROM ${mysqlDb}.${tableTs} WHERE 
event_ts='2025-06-15 12:34:56.999999'"""
+
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES 
('2024-02-05 00:00:00.000000', 6, 'F3')"""
+            sql """UPDATE ${mysqlDb}.${tableComposite} SET payload='C3_upd' 
WHERE event_ts='2024-02-02 12:00:00.500000' AND id=3"""
+            sql """DELETE FROM ${mysqlDb}.${tableComposite} WHERE 
event_ts='2024-02-03 23:59:59.999999' AND id=4"""
+        }
+
+        try {
+            Awaitility.await().atMost(180, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def c1 = sql """select count(1) from 
${currentDb}.${tableTs}"""
+                        def c2 = sql """select count(1) from 
${currentDb}.${tableComposite}"""
+                        def upd1 = sql """select payload from 
${currentDb}.${tableTs} where event_ts='2024-06-15 12:00:00.123456'"""
+                        def upd2 = sql """select payload from 
${currentDb}.${tableComposite} where event_ts='2024-02-02 12:00:00.500000' and 
id=3"""
+                        def del1 = sql """select count(1) from 
${currentDb}.${tableTs} where event_ts='2025-06-15 12:34:56.999999'"""
+                        def del2 = sql """select count(1) from 
${currentDb}.${tableComposite} where event_ts='2024-02-03 23:59:59.999999' and 
id=4"""
+                        def p1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+                        def p2 = upd2.size() == 0 ? null : upd2.get(0).get(0)
+                        log.info("incr ts=${c1} composite=${c2} ts_upd=${p1} 
comp_upd=${p2} ts_del=${del1} comp_del=${del2}")
+                        c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 &&
+                                p1 == 'B2_upd' && p2 == 'C3_upd' &&
+                                del1.get(0).get(0) == 0 && del2.get(0).get(0) 
== 0
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job (incr): " + showjob)
+            log.info("show task (incr): " + showtask)
+            throw ex
+        }
+
+        qt_select_after_incr_timestamp_pk """select event_ts, payload from 
${currentDb}.${tableTs} order by event_ts asc"""
+        qt_select_after_incr_composite_pk """select event_ts, id, payload from 
${currentDb}.${tableComposite} order by event_ts asc, id asc"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.groovy
new file mode 100644
index 00000000000..13b2e4c09ab
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.groovy
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// PG counterpart of test_streaming_mysql_job_jdbc_servertimezone.
+//
+// Recommended end-to-end tz configuration for data fidelity: set jdbc_url's
+// timezone to the SOURCE PG session/server tz, so cdc renders the timestamptz
+// instant back to the exact wall clock the source shows. Doris data then stays
+// identical to PG, independent of Doris's own session tz.
+//
+// Source tz is Asia/Tokyo (+09, no DST), deliberately != Doris default +08, so
+// the case proves the rendering follows the SOURCE tz (not Doris). The source
+// session uses the offset '+09:00' while jdbc_url uses the IANA name
+// 'Asia/Tokyo' (cdc resolves it via ZoneId).
+//
+// timetz is NOT rendered into the source tz: a time-of-day has no date, so a
+// named zone's DST offset cannot be resolved. Mirroring Debezium/PostgreSQL,
+// cdc keeps timetz UTC-normalized with its offset (e.g. '02:00Z').
+//
+// Setup (source tz = Asia/Tokyo = +09):
+//   source SET TIME ZONE INTERVAL '+09:00' HOUR TO MINUTE, INSERT '2024-06-15 
11:00:00'
+//     ts   (timestamp)   -> literal '2024-06-15 11:00:00'
+//     tstz (timestamptz) -> source-internal UTC instant 2024-06-15 02:00:00Z
+//     ttz  (timetz)      -> source-internal UTC time 02:00:00Z
+//   jdbc_url timezone aligned to the SOURCE tz (Asia/Tokyo)
+//
+// Expectation at Doris (independent of Doris session tz, since cdc renders
+// with the source tz, not Doris's; .out has no dependency on Doris tz):
+//   ts   -> '2024-06-15T11:00'  (verbatim, no tz semantics)
+//   tstz -> '2024-06-15T11:00'  (02:00Z rendered with Asia/Tokyo +09 = 11:00, 
== source)
+//   ttz  -> '02:00:00Z'         (UTC-normalized time-of-day, kept as-is; not 
zone-rendered)
+suite("test_streaming_postgres_job_jdbc_servertimezone", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_jdbc_servertimezone_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_pg_jdbc_servertimezone"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // jdbc timezone is aligned to the SOURCE db tz (not Doris) so Doris
+        // data matches the source wall clock. Log Doris tz only to show the
+        // result is independent of it.
+        def sourceTz = "+09:00"
+        def jdbcTz = "Asia/Tokyo"
+        log.info("Doris session time_zone = ${(sql "select 
@@time_zone")[0][0]}; cdc renders with source tz ${jdbcTz}.")
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """
+            create table ${pgDB}.${pgSchema}.${table1} (
+                id   integer PRIMARY KEY,
+                tag  varchar(32),
+                ts   timestamp,
+                tstz timestamp with time zone,
+                ttz  time with time zone
+            );
+            """
+
+            sql """SET TIME ZONE INTERVAL '+09:00' HOUR TO MINUTE"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 
'snapshot_tokyo',
+                '2024-06-15 11:00:00', '2024-06-15 11:00:00', '11:00:00')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=${jdbcTz}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        log.info("snapshot row count: " + cnt)
+                        cnt.get(0).get(0) == 1
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_desc """desc ${currentDb}.${table1};"""
+        qt_select_snapshot """select * from ${currentDb}.${table1} order by 
id;"""
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """SET TIME ZONE INTERVAL '+09:00' HOUR TO MINUTE"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 
'binlog_tokyo',
+                '2024-06-15 11:00:00', '2024-06-15 11:00:00', '11:00:00')"""
+        }
+
+        Awaitility.await().atMost(180, SECONDS)
+                .pollInterval(2, SECONDS).until(
+                {
+                    def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                    cnt.get(0).get(0) == 2
+                }
+        )
+
+        qt_select_binlog """select * from ${currentDb}.${table1} order by 
id;"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name = '${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.groovy
new file mode 100644
index 00000000000..f9beddd3e00
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.groovy
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// PG counterpart of test_streaming_mysql_job_source_timezone.
+//
+// PG temporal semantics relevant to cdc tz handling:
+//   timestamp   - wall clock, no tz; debezium emits epoch-style schema, cdc
+//                 bypasses serverTimeZone.
+//   timestamptz - normalized to UTC on write per session TimeZone; debezium
+//                 emits ZonedTimestamp ISO string, cdc renders it using
+//                 serverTimeZone.
+//   timetz      - time-of-day with offset. Debezium emits ZonedTime, a
+//                 UTC-normalized OffsetTime; cdc keeps it as-is (offset
+//                 preserved) rather than rendering into a named zone, since a
+//                 date-less time cannot resolve DST. Mirrors 
Debezium/PostgreSQL.
+//   date        - no tz; literal day, must not drift across +08 boundary.
+//
+// Coverage:
+//   * source session tz set to +08 / -05 / +00, same wall clock written ->
+//     different UTC instants for timestamptz prove cdc honors source session.
+//   * NULL row across every temporal column.
+//   * epoch lower bound ('1970-01-01 00:00:01Z').
+//   * Binlog path mirrors snapshot themes, plus an UPDATE on tstz0 under +08.
+//
+// jdbc_url uses timezone=UTC so cdc renders timestamptz back to UTC wall
+// clock regardless of the source session offset.
+suite("test_streaming_postgres_job_source_timezone", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_source_timezone_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_pg_source_timezone"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """
+            create table ${pgDB}.${pgSchema}.${table1} (
+                id    integer PRIMARY KEY,
+                tag   varchar(32),
+                ts    timestamp,
+                tstz0 timestamptz(0),
+                tstz3 timestamptz(3),
+                tstz6 timestamptz(6),
+                ttz   time with time zone,
+                d     date
+            );
+            """
+
+            // INTERVAL form avoids depending on the container's tzdata.
+            // id=1: +08 baseline
+            sql """SET TIME ZONE INTERVAL '+08:00' HOUR TO MINUTE"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 
'snapshot_plus08',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123',
+                '2024-06-15 20:00:00.123456',
+                '20:00:00.123456',
+                '2024-06-15')"""
+
+            // id=2: -05 -> same wall clock crosses to next UTC day for 
timestamptz
+            sql """SET TIME ZONE INTERVAL '-05:00' HOUR TO MINUTE"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 
'snapshot_minus05',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123',
+                '2024-06-15 20:00:00.123456',
+                '20:00:00.123456',
+                '2024-06-15')"""
+
+            // id=3: +00 -> wall clock == UTC instant
+            sql """SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3, 
'snapshot_utc',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123',
+                '2024-06-15 20:00:00.123456',
+                '20:00:00.123456',
+                '2024-06-15')"""
+
+            // id=4: NULL across every nullable column
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag) VALUES 
(4, 'snapshot_null')"""
+
+            // id=5: epoch lower bound under +08
+            sql """SET TIME ZONE INTERVAL '+08:00' HOUR TO MINUTE"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (5, 
'snapshot_epoch_plus08',
+                '1970-01-01 08:00:01',
+                '1970-01-01 08:00:01',
+                '1970-01-01 08:00:01.123',
+                '1970-01-01 08:00:01.123456',
+                '08:00:01.123456',
+                '1970-01-01')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        log.info("snapshot row count: " + cnt)
+                        cnt.get(0).get(0) == 5
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_desc """desc ${currentDb}.${table1};"""
+        qt_select_snapshot """select * from ${currentDb}.${table1} order by 
id;"""
+
+        // Binlog phase: same tz themes, plus an UPDATE that rewrites tstz0
+        // on id=1 under +08 to confirm UPDATE follows the same tz codepath.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """SET TIME ZONE INTERVAL '+08:00' HOUR TO MINUTE"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (101, 
'binlog_plus08',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123',
+                '2024-06-15 20:00:00.123456',
+                '20:00:00.123456',
+                '2024-06-15')"""
+
+            sql """SET TIME ZONE INTERVAL '-05:00' HOUR TO MINUTE"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (102, 
'binlog_minus05',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123',
+                '2024-06-15 20:00:00.123456',
+                '20:00:00.123456',
+                '2024-06-15')"""
+
+            sql """SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (103, 
'binlog_utc',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00',
+                '2024-06-15 20:00:00.123',
+                '2024-06-15 20:00:00.123456',
+                '20:00:00.123456',
+                '2024-06-15')"""
+
+            sql """SET TIME ZONE INTERVAL '+08:00' HOUR TO MINUTE"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (105, 
'binlog_epoch_plus08',
+                '1970-01-01 08:00:01',
+                '1970-01-01 08:00:01',
+                '1970-01-01 08:00:01.123',
+                '1970-01-01 08:00:01.123456',
+                '08:00:01.123456',
+                '1970-01-01')"""
+
+            // UPDATE: id=1 tstz0 was '20:00 +08' (UTC 12:00). Push wall clock 
to
+            // '22:00 +08' (UTC 14:00) so we can poll for completion.
+            sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET tstz0 = 
'2024-06-15 22:00:00' WHERE id = 1"""
+        }
+
+        // Wait for 4 binlog INSERTs + UPDATE on id=1 to settle.
+        Awaitility.await().atMost(180, SECONDS)
+                .pollInterval(2, SECONDS).until(
+                {
+                    def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                    if (cnt.get(0).get(0) != 9) return false
+                    def updated = sql """select tstz0 from 
${currentDb}.${table1} where id = 1"""
+                    return 
updated.get(0).get(0).toString().startsWith('2024-06-15T14:00')
+                }
+        )
+
+        qt_select_binlog """select * from ${currentDb}.${table1} order by 
id;"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name = '${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.groovy
new file mode 100644
index 00000000000..02939cc2307
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.groovy
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Cover PG timestamp / timestamptz chunk-key paths: timestamp (no tz,
+// driver returns LocalDateTime), timestamptz (driver returns
+// OffsetDateTime), plus a composite (timestamptz, id) PK to exercise
+// multi-column locating.
+//
+// Source SET TIME ZONE = UTC and jdbc_url timezone=UTC so the timestamptz
+// wall clock written by INSERT equals the value cdc renders, and the .out
+// can be filled deterministically.
+suite("test_streaming_postgres_job_timestamp_pk", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_timestamp_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def tableTs = "events_pg_timestamp_pk"
+    def tableTstz = "events_pg_timestamptz_pk"
+    def tableComposite = "events_pg_timestamptz_id_pk"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${tableTs} force"""
+    sql """drop table if exists ${currentDb}.${tableTstz} force"""
+    sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE"""
+
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableTs}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableTs} (
+                  event_ts timestamp(6) NOT NULL,
+                  payload  varchar(64),
+                  PRIMARY KEY (event_ts)
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES 
('2024-01-01 00:00:00.000000', 'A1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES 
('2024-06-15 12:00:00.123456', 'B1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES 
('2025-01-01 00:00:00.000000', 'C1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES 
('2025-06-15 12:34:56.999999', 'D1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES 
('2026-01-01 00:00:00.000000', 'E1')"""
+
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableTstz}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableTstz} (
+                  event_ts timestamptz(6) NOT NULL,
+                  payload  varchar(64),
+                  PRIMARY KEY (event_ts)
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES 
('2024-01-01 00:00:00.000000', 'A1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES 
('2024-06-15 12:00:00.123456', 'B1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES 
('2025-01-01 00:00:00.000000', 'C1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES 
('2025-06-15 12:34:56.999999', 'D1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES 
('2026-01-01 00:00:00.000000', 'E1')"""
+
+            sql """DROP TABLE IF EXISTS 
${pgDB}.${pgSchema}.${tableComposite}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableComposite} (
+                  event_ts timestamptz(6) NOT NULL,
+                  id       integer        NOT NULL,
+                  payload  varchar(64),
+                  PRIMARY KEY (event_ts, id)
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES 
('2024-02-01 00:00:00.000000', 1, 'A2')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES 
('2024-02-01 00:00:00.000000', 2, 'B2')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES 
('2024-02-02 12:00:00.500000', 3, 'C2')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES 
('2024-02-03 23:59:59.999999', 4, 'D2')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES 
('2024-02-04 00:00:00.000000', 5, 'E2')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = 
"${tableTs},${tableTstz},${tableComposite}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "2"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def c1 = sql """select count(1) from 
${currentDb}.${tableTs}"""
+                        def c2 = sql """select count(1) from 
${currentDb}.${tableTstz}"""
+                        def c3 = sql """select count(1) from 
${currentDb}.${tableComposite}"""
+                        log.info("snapshot row count ts=${c1} tstz=${c2} 
composite=${c3}")
+                        c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 && 
c3.get(0).get(0) == 5
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_select_snapshot_timestamp_pk """select event_ts, payload from 
${currentDb}.${tableTs} order by event_ts asc"""
+        qt_select_snapshot_timestamptz_pk """select event_ts, payload from 
${currentDb}.${tableTstz} order by event_ts asc"""
+        qt_select_snapshot_composite_pk """select event_ts, id, payload from 
${currentDb}.${tableComposite} order by event_ts asc, id asc"""
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE"""
+
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES 
('2026-06-01 00:00:00.000000', 'F2')"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${tableTs} SET payload='B2_upd' 
WHERE event_ts='2024-06-15 12:00:00.123456'"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${tableTs} WHERE 
event_ts='2025-06-15 12:34:56.999999'"""
+
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES 
('2026-06-01 00:00:00.000000', 'F2')"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${tableTstz} SET 
payload='B2_upd' WHERE event_ts='2024-06-15 12:00:00.123456'"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${tableTstz} WHERE 
event_ts='2025-06-15 12:34:56.999999'"""
+
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES 
('2024-02-05 00:00:00.000000', 6, 'F3')"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${tableComposite} SET 
payload='C3_upd' WHERE event_ts='2024-02-02 12:00:00.500000' AND id=3"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${tableComposite} WHERE 
event_ts='2024-02-03 23:59:59.999999' AND id=4"""
+        }
+
+        try {
+            Awaitility.await().atMost(180, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def c1 = sql """select count(1) from 
${currentDb}.${tableTs}"""
+                        def c2 = sql """select count(1) from 
${currentDb}.${tableTstz}"""
+                        def c3 = sql """select count(1) from 
${currentDb}.${tableComposite}"""
+                        def upd1 = sql """select payload from 
${currentDb}.${tableTs} where event_ts='2024-06-15 12:00:00.123456'"""
+                        def upd2 = sql """select payload from 
${currentDb}.${tableTstz} where event_ts='2024-06-15 12:00:00.123456'"""
+                        def upd3 = sql """select payload from 
${currentDb}.${tableComposite} where event_ts='2024-02-02 12:00:00.500000' and 
id=3"""
+                        def del1 = sql """select count(1) from 
${currentDb}.${tableTs} where event_ts='2025-06-15 12:34:56.999999'"""
+                        def del2 = sql """select count(1) from 
${currentDb}.${tableTstz} where event_ts='2025-06-15 12:34:56.999999'"""
+                        def del3 = sql """select count(1) from 
${currentDb}.${tableComposite} where event_ts='2024-02-03 23:59:59.999999' and 
id=4"""
+                        def p1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+                        def p2 = upd2.size() == 0 ? null : upd2.get(0).get(0)
+                        def p3 = upd3.size() == 0 ? null : upd3.get(0).get(0)
+                        log.info("incr ts=${c1} tstz=${c2} comp=${c3} 
ts_upd=${p1} tstz_upd=${p2} comp_upd=${p3} dels=${del1}/${del2}/${del3}")
+                        c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 && 
c3.get(0).get(0) == 5 &&
+                                p1 == 'B2_upd' && p2 == 'B2_upd' && p3 == 
'C3_upd' &&
+                                del1.get(0).get(0) == 0 && del2.get(0).get(0) 
== 0 && del3.get(0).get(0) == 0
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job (incr): " + showjob)
+            log.info("show task (incr): " + showtask)
+            throw ex
+        }
+
+        qt_select_after_incr_timestamp_pk """select event_ts, payload from 
${currentDb}.${tableTs} order by event_ts asc"""
+        qt_select_after_incr_timestamptz_pk """select event_ts, payload from 
${currentDb}.${tableTstz} order by event_ts asc"""
+        qt_select_after_incr_composite_pk """select event_ts, id, payload from 
${currentDb}.${tableComposite} order by event_ts asc, id asc"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to