This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e2c834003fc [regression-test](streaming-job) add cdc cases for
source/jdbc timezone and TIMESTAMP/timestamptz pk (#63543)
e2c834003fc is described below
commit e2c834003fcf5bb6f863639e0792a80adb65df02
Author: wudi <[email protected]>
AuthorDate: Fri Jun 5 11:12:04 2026 +0800
[regression-test](streaming-job) add cdc cases for source/jdbc timezone and
TIMESTAMP/timestamptz pk (#63543)
### What problem does this PR solve?
Add CDC streaming-job regression coverage for timezone behavior and
TIMESTAMP / timestamptz chunk-key paths that the existing suites do not
exercise.
#### Cases added
| Case | Tables | Guards |
|---|---|---|
| `*_source_timezone` (mysql + pg) | TIMESTAMP/timestamptz
multi-precision + DATETIME/timestamp + DATE/date (pg: + `timetz`) |
source-side multi-tz INSERT (+08 / -05 / UTC), NULL, UPDATE crossing tz,
epoch boundary; pg `timetz` column kept as a regression guard for the
upstream JVM-tz handling |
| `*_jdbc_servertimezone` (mysql + pg) | TIMESTAMP/timestamptz +
DATETIME/timestamp | recommended end-to-end config — align `jdbc_url`'s
`serverTimezone`/`timezone` with Doris session `time_zone` (read at
runtime so it works on any default tz) |
| `*_timestamp_pk` (mysql + pg) | mysql TIMESTAMP(6) + composite
(TIMESTAMP, id); pg timestamp(6) + timestamptz(6) + composite
(timestamptz, id) | chunk-key split + INSERT/UPDATE/DELETE locating on
TIMESTAMP/timestamptz PK; depends on #63471 for `LocalDateTime` /
`OffsetDateTime` chunk-bound restore in
`AbstractCdcSourceReader.convertBound` |
---
.../deserialize/DebeziumJsonDeserializer.java | 15 ++
.../source/reader/AbstractCdcSourceReader.java | 23 ++-
.../deserialize/DebeziumJsonDeserializerTest.java | 50 +++++
...est_streaming_mysql_job_jdbc_servertimezone.out | 14 ++
.../test_streaming_mysql_job_source_timezone.out | 29 +++
.../cdc/test_streaming_mysql_job_timestamp_pk.out | 28 +++
..._streaming_postgres_job_jdbc_servertimezone.out | 15 ++
...test_streaming_postgres_job_source_timezone.out | 29 +++
.../test_streaming_postgres_job_timestamp_pk.out | 42 ++++
..._streaming_mysql_job_jdbc_servertimezone.groovy | 144 +++++++++++++
...test_streaming_mysql_job_source_timezone.groovy | 218 ++++++++++++++++++++
.../test_streaming_mysql_job_timestamp_pk.groovy | 167 +++++++++++++++
...reaming_postgres_job_jdbc_servertimezone.groovy | 154 ++++++++++++++
...t_streaming_postgres_job_source_timezone.groovy | 226 +++++++++++++++++++++
...test_streaming_postgres_job_timestamp_pk.groovy | 193 ++++++++++++++++++
15 files changed, 1346 insertions(+), 1 deletion(-)
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 6881dd70d40..8487975f7d1 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -69,6 +69,7 @@ import io.debezium.time.NanoTime;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Time;
import io.debezium.time.Timestamp;
+import io.debezium.time.ZonedTime;
import io.debezium.time.ZonedTimestamp;
import lombok.Getter;
import lombok.Setter;
@@ -255,6 +256,8 @@ public class DebeziumJsonDeserializer
return convertTimestamp(name, dbzObj);
case ZonedTimestamp.SCHEMA_NAME:
return convertZoneTimestamp(dbzObj);
+ case ZonedTime.SCHEMA_NAME:
+ return convertZoneTime(dbzObj);
case Decimal.LOGICAL_NAME:
return convertDecimal(dbzObj, fieldSchema);
case Bits.LOGICAL_NAME:
@@ -317,6 +320,18 @@ public class DebeziumJsonDeserializer
return dbzObj.toString();
}
+ private Object convertZoneTime(Object dbzObj) {
+ // timetz has no date, so a named zone's DST offset cannot be
resolved. Following
+ // Debezium/PostgreSQL semantics, keep Debezium's UTC-normalized
ZonedTime string as-is
+ // (offset preserved) rather than shifting it into serverTimeZone,
which would drop the
+ // offset and mishandle DST.
+ if (dbzObj instanceof String) {
+ return dbzObj;
+ }
+ LOG.warn("Unable to convert to zone time, default {}", dbzObj);
+ return dbzObj.toString();
+ }
+
private Object convertTimestamp(String typeName, Object dbzObj) {
if (dbzObj instanceof Long) {
switch (typeName) {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
index 6c46482dc6d..74eb534bc5f 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -100,13 +100,34 @@ public abstract class AbstractCdcSourceReader implements
SourceReader {
private static final Map<Class<?>, Function<String, Object>> BOUND_PARSERS
=
Map.of(
java.sql.Date.class, java.sql.Date::valueOf,
- java.sql.Timestamp.class, java.sql.Timestamp::valueOf,
+ java.sql.Timestamp.class,
AbstractCdcSourceReader::parseTimestampBound,
java.sql.Time.class, java.sql.Time::valueOf,
java.time.LocalDateTime.class,
java.time.LocalDateTime::parse,
java.time.LocalDate.class, java.time.LocalDate::parse,
java.time.LocalTime.class, java.time.LocalTime::parse,
java.time.OffsetDateTime.class,
java.time.OffsetDateTime::parse);
+ // Offset suffix like "+08:00" / "-05:00" ("Z" is handled separately).
+ private static final java.util.regex.Pattern OFFSET_SUFFIX =
+ java.util.regex.Pattern.compile("[+-]\\d{2}:\\d{2}$");
+
+ /**
+ * A java.sql.Timestamp bound is serialized to FE by Jackson as ISO-8601
(with a zone offset
+ * when WRITE_DATES_AS_TIMESTAMPS is off), which
java.sql.Timestamp#valueOf cannot parse.
+ * Reconstruct instant-faithfully so the rebuilt bound binds back to the
same value flink-cdc
+ * read.
+ */
+ private static java.sql.Timestamp parseTimestampBound(String s) {
+ try {
+ return java.sql.Timestamp.valueOf(s); // legacy SQL form
"yyyy-MM-dd HH:mm:ss[.f]"
+ } catch (IllegalArgumentException notSqlForm) {
+ if (s.endsWith("Z") || OFFSET_SUFFIX.matcher(s).find()) {
+ return
java.sql.Timestamp.from(java.time.OffsetDateTime.parse(s).toInstant());
+ }
+ return
java.sql.Timestamp.valueOf(java.time.LocalDateTime.parse(s)); // ISO local ('T')
+ }
+ }
+
private static Object convertBound(Object v, Class<?> target, ObjectMapper
mapper) {
if (v == null) {
return null;
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
index ac4d7b58226..75b2ffbfe9e 100644
---
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.cdcclient.source.deserialize;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Method;
+import java.time.ZoneId;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,4 +74,53 @@ class DebeziumJsonDeserializerTest {
throw new RuntimeException(e);
}
}
+
+ // ─── convertZoneTime
──────────────────────────────────────────────────────
+ // timetz arrives as a UTC-normalized ISO string (Debezium ZonedTime). cdc
keeps it
+ // verbatim with the offset preserved, independent of serverTimeZone,
since a
+ // date-less time cannot resolve a named zone's DST. Mirrors
Debezium/PostgreSQL.
+
+ @Test
+ void zoneTime_utc_preservesOffset() {
+ deserializer.setServerTimeZone(ZoneId.of("UTC"));
+ assertEquals("12:00:00.123456Z",
invokeConvertZoneTime("12:00:00.123456Z"));
+ }
+
+ @Test
+ void zoneTime_plus08_serverTimeZoneIgnored() {
+ deserializer.setServerTimeZone(ZoneId.of("+08:00"));
+ // serverTimeZone must not shift timetz; the offset-bearing string is
kept as-is
+ assertEquals("12:00:00.123456Z",
invokeConvertZoneTime("12:00:00.123456Z"));
+ }
+
+ @Test
+ void zoneTime_minus05_serverTimeZoneIgnored() {
+ deserializer.setServerTimeZone(ZoneId.of("-05:00"));
+ assertEquals("01:00:00.123456Z",
invokeConvertZoneTime("01:00:00.123456Z"));
+ }
+
+ @Test
+ void zoneTime_dstZone_notShifted() {
+ // a DST zone's offset is date-dependent; timetz has no date, so it
must not be
+ // shifted -- the input is returned unchanged regardless of New York
DST.
+ deserializer.setServerTimeZone(ZoneId.of("America/New_York"));
+ assertEquals("12:00:00.123456Z",
invokeConvertZoneTime("12:00:00.123456Z"));
+ }
+
+ @Test
+ void zoneTime_wholeSecond_keepsSeconds() {
+ deserializer.setServerTimeZone(ZoneId.of("UTC"));
+ assertEquals("00:00:00Z", invokeConvertZoneTime("00:00:00Z"));
+ }
+
+ private Object invokeConvertZoneTime(Object dbzObj) {
+ try {
+ Method m =
+
DebeziumJsonDeserializer.class.getDeclaredMethod("convertZoneTime",
Object.class);
+ m.setAccessible(true);
+ return m.invoke(deserializer, dbzObj);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.out
new file mode 100644
index 00000000000..7e95f55810a
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.out
@@ -0,0 +1,14 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc --
+id int No true \N
+tag varchar(96) Yes false \N NONE
+ts0 datetime Yes false \N NONE
+dt0 datetime Yes false \N NONE
+
+-- !select_snapshot --
+1 snapshot_tokyo 2024-06-15T11:00 2024-06-15T11:00
+
+-- !select_binlog --
+1 snapshot_tokyo 2024-06-15T11:00 2024-06-15T11:00
+2 binlog_tokyo 2024-06-15T11:00 2024-06-15T11:00
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.out
new file mode 100644
index 00000000000..dbed0d086d1
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc --
+id int No true \N
+tag varchar(96) Yes false \N NONE
+ts0 datetime Yes false \N NONE
+ts3 datetime(3) Yes false \N NONE
+ts6 datetime(6) Yes false \N NONE
+dt0 datetime Yes false \N NONE
+dt6 datetime(6) Yes false \N NONE
+d date Yes false \N NONE
+
+-- !select_snapshot --
+1 snapshot_plus08 2024-06-15T12:00 2024-06-15T12:00:00.123
2024-06-15T12:00:00.123456 2024-06-15T20:00
2024-06-15T20:00:00.123456 2024-06-15
+2 snapshot_minus05 2024-06-16T01:00 2024-06-16T01:00:00.123
2024-06-16T01:00:00.123456 2024-06-15T20:00
2024-06-15T20:00:00.123456 2024-06-15
+3 snapshot_utc 2024-06-15T20:00 2024-06-15T20:00:00.123
2024-06-15T20:00:00.123456 2024-06-15T20:00
2024-06-15T20:00:00.123456 2024-06-15
+4 snapshot_null \N \N \N \N \N \N
+5 snapshot_epoch_plus08 1970-01-01T00:00:01 1970-01-01T00:00:01.123
1970-01-01T00:00:01.123456 1970-01-01T08:00:01
1970-01-01T08:00:01.123456 1970-01-01
+
+-- !select_binlog --
+1 snapshot_plus08 2024-06-15T14:00 2024-06-15T12:00:00.123
2024-06-15T12:00:00.123456 2024-06-15T20:00
2024-06-15T20:00:00.123456 2024-06-15
+2 snapshot_minus05 2024-06-16T01:00 2024-06-16T01:00:00.123
2024-06-16T01:00:00.123456 2024-06-15T20:00
2024-06-15T20:00:00.123456 2024-06-15
+3 snapshot_utc 2024-06-15T20:00 2024-06-15T20:00:00.123
2024-06-15T20:00:00.123456 2024-06-15T20:00
2024-06-15T20:00:00.123456 2024-06-15
+4 snapshot_null \N \N \N \N \N \N
+5 snapshot_epoch_plus08 1970-01-01T00:00:01 1970-01-01T00:00:01.123
1970-01-01T00:00:01.123456 1970-01-01T08:00:01
1970-01-01T08:00:01.123456 1970-01-01
+101 binlog_plus08 2024-06-15T12:00 2024-06-15T12:00:00.123
2024-06-15T12:00:00.123456 2024-06-15T20:00
2024-06-15T20:00:00.123456 2024-06-15
+102 binlog_minus05 2024-06-16T01:00 2024-06-16T01:00:00.123
2024-06-16T01:00:00.123456 2024-06-15T20:00
2024-06-15T20:00:00.123456 2024-06-15
+103 binlog_utc 2024-06-15T20:00 2024-06-15T20:00:00.123
2024-06-15T20:00:00.123456 2024-06-15T20:00
2024-06-15T20:00:00.123456 2024-06-15
+105 binlog_epoch_plus08 1970-01-01T00:00:01 1970-01-01T00:00:01.123
1970-01-01T00:00:01.123456 1970-01-01T08:00:01
1970-01-01T08:00:01.123456 1970-01-01
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.out
new file mode 100644
index 00000000000..2691ac4ac12
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.out
@@ -0,0 +1,28 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_timestamp_pk --
+2024-01-01T00:00 A1
+2024-06-15T12:00:00.123456 B1
+2025-01-01T00:00 C1
+2025-06-15T12:34:56.999999 D1
+2026-01-01T00:00 E1
+
+-- !select_snapshot_composite_pk --
+2024-02-01T00:00 1 A2
+2024-02-01T00:00 2 B2
+2024-02-02T12:00:00.500 3 C2
+2024-02-03T23:59:59.999999 4 D2
+2024-02-04T00:00 5 E2
+
+-- !select_after_incr_timestamp_pk --
+2024-01-01T00:00 A1
+2024-06-15T12:00:00.123456 B2_upd
+2025-01-01T00:00 C1
+2026-01-01T00:00 E1
+2026-06-01T00:00 F2
+
+-- !select_after_incr_composite_pk --
+2024-02-01T00:00 1 A2
+2024-02-01T00:00 2 B2
+2024-02-02T12:00:00.500 3 C3_upd
+2024-02-04T00:00 5 E2
+2024-02-05T00:00 6 F3
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.out
new file mode 100644
index 00000000000..ba312fbce72
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc --
+id int No true \N
+tag text Yes false \N NONE
+ts datetime(6) Yes false \N NONE
+tstz datetime(6) Yes false \N NONE
+ttz text Yes false \N NONE
+
+-- !select_snapshot --
+1 snapshot_tokyo 2024-06-15T11:00 2024-06-15T11:00
02:00:00Z
+
+-- !select_binlog --
+1 snapshot_tokyo 2024-06-15T11:00 2024-06-15T11:00
02:00:00Z
+2 binlog_tokyo 2024-06-15T11:00 2024-06-15T11:00
02:00:00Z
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.out
new file mode 100644
index 00000000000..a7c3fd1274c
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc --
+id int No true \N
+tag text Yes false \N NONE
+ts datetime(6) Yes false \N NONE
+tstz0 datetime Yes false \N NONE
+tstz3 datetime(3) Yes false \N NONE
+tstz6 datetime(6) Yes false \N NONE
+ttz text Yes false \N NONE
+d date Yes false \N NONE
+
+-- !select_snapshot --
+1 snapshot_plus08 2024-06-15T20:00 2024-06-15T12:00
2024-06-15T12:00:00.123 2024-06-15T12:00:00.123456 12:00:00.123456Z
2024-06-15
+2 snapshot_minus05 2024-06-15T20:00 2024-06-16T01:00
2024-06-16T01:00:00.123 2024-06-16T01:00:00.123456 01:00:00.123456Z
2024-06-15
+3 snapshot_utc 2024-06-15T20:00 2024-06-15T20:00
2024-06-15T20:00:00.123 2024-06-15T20:00:00.123456 20:00:00.123456Z
2024-06-15
+4 snapshot_null \N \N \N \N \N \N
+5 snapshot_epoch_plus08 1970-01-01T08:00:01 1970-01-01T00:00:01
1970-01-01T00:00:01.123 1970-01-01T00:00:01.123456 00:00:01.123456Z
1970-01-01
+
+-- !select_binlog --
+1 snapshot_plus08 2024-06-15T20:00 2024-06-15T14:00
2024-06-15T12:00:00.123 2024-06-15T12:00:00.123456 12:00:00.123456Z
2024-06-15
+2 snapshot_minus05 2024-06-15T20:00 2024-06-16T01:00
2024-06-16T01:00:00.123 2024-06-16T01:00:00.123456 01:00:00.123456Z
2024-06-15
+3 snapshot_utc 2024-06-15T20:00 2024-06-15T20:00
2024-06-15T20:00:00.123 2024-06-15T20:00:00.123456 20:00:00.123456Z
2024-06-15
+4 snapshot_null \N \N \N \N \N \N
+5 snapshot_epoch_plus08 1970-01-01T08:00:01 1970-01-01T00:00:01
1970-01-01T00:00:01.123 1970-01-01T00:00:01.123456 00:00:01.123456Z
1970-01-01
+101 binlog_plus08 2024-06-15T20:00 2024-06-15T12:00
2024-06-15T12:00:00.123 2024-06-15T12:00:00.123456 12:00:00.123456Z
2024-06-15
+102 binlog_minus05 2024-06-15T20:00 2024-06-16T01:00
2024-06-16T01:00:00.123 2024-06-16T01:00:00.123456 01:00:00.123456Z
2024-06-15
+103 binlog_utc 2024-06-15T20:00 2024-06-15T20:00
2024-06-15T20:00:00.123 2024-06-15T20:00:00.123456 20:00:00.123456Z
2024-06-15
+105 binlog_epoch_plus08 1970-01-01T08:00:01 1970-01-01T00:00:01
1970-01-01T00:00:01.123 1970-01-01T00:00:01.123456 00:00:01.123456Z
1970-01-01
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.out
new file mode 100644
index 00000000000..a8ef3d6d9af
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.out
@@ -0,0 +1,42 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_timestamp_pk --
+2024-01-01T00:00 A1
+2024-06-15T12:00:00.123456 B1
+2025-01-01T00:00 C1
+2025-06-15T12:34:56.999999 D1
+2026-01-01T00:00 E1
+
+-- !select_snapshot_timestamptz_pk --
+2024-01-01T00:00 A1
+2024-06-15T12:00:00.123456 B1
+2025-01-01T00:00 C1
+2025-06-15T12:34:56.999999 D1
+2026-01-01T00:00 E1
+
+-- !select_snapshot_composite_pk --
+2024-02-01T00:00 1 A2
+2024-02-01T00:00 2 B2
+2024-02-02T12:00:00.500 3 C2
+2024-02-03T23:59:59.999999 4 D2
+2024-02-04T00:00 5 E2
+
+-- !select_after_incr_timestamp_pk --
+2024-01-01T00:00 A1
+2024-06-15T12:00:00.123456 B2_upd
+2025-01-01T00:00 C1
+2026-01-01T00:00 E1
+2026-06-01T00:00 F2
+
+-- !select_after_incr_timestamptz_pk --
+2024-01-01T00:00 A1
+2024-06-15T12:00:00.123456 B2_upd
+2025-01-01T00:00 C1
+2026-01-01T00:00 E1
+2026-06-01T00:00 F2
+
+-- !select_after_incr_composite_pk --
+2024-02-01T00:00 1 A2
+2024-02-01T00:00 2 B2
+2024-02-02T12:00:00.500 3 C3_upd
+2024-02-04T00:00 5 E2
+2024-02-05T00:00 6 F3
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.groovy
new file mode 100644
index 00000000000..5f108efff78
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.groovy
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Recommended end-to-end tz configuration for data fidelity: set jdbc_url's
+// serverTimezone to the SOURCE MySQL session/server tz, so cdc renders the
+// TIMESTAMP instant back to the exact wall clock the source shows. Doris data
+// then stays identical to MySQL, independent of Doris's own session tz.
+//
+// Source tz is +09 (Tokyo), deliberately != Doris default +08, so the case
+// proves the rendering follows the SOURCE tz (not Doris). Both the source
+// session and jdbc_url use the fixed offset '+09:00' (not a named zone):
forcing
+// the connection session to an offset needs no MySQL tz tables, and with no
DST
+// the offset is constant so the result is fully deterministic.
+//
+// Setup (source tz = +09):
+// source SET SESSION time_zone='+09:00', INSERT '2024-06-15 11:00:00'
+// ts0 (TIMESTAMP) -> source-internal UTC instant 2024-06-15 02:00:00Z
+// dt0 (DATETIME) -> literal '2024-06-15 11:00:00'
+// jdbc_url serverTimezone aligned to the SOURCE tz (+09:00)
+//
+// Expectation at Doris (independent of Doris session tz, since cdc renders
+// with the source tz, not Doris's; .out has no dependency on Doris tz):
+// ts0 -> '2024-06-15T11:00' (02:00Z rendered with +09:00 = 11:00, ==
source)
+// dt0 -> '2024-06-15T11:00' (DATETIME has no tz semantics, stored verbatim)
+suite("test_streaming_mysql_job_jdbc_servertimezone",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_jdbc_servertimezone_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_mysql_jdbc_servertimezone"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // jdbc serverTimezone is aligned to the SOURCE db tz (not Doris) so
+ // Doris data matches the source wall clock. Log Doris tz only to show
+ // the result is independent of it.
+ def sourceTz = "+09:00"
+ // %2B is a URL-encoded '+': a literal '+' in a jdbc URL decodes to a
space
+ def jdbcTz = "%2B09:00"
+ log.info("Doris session time_zone = ${(sql "select
@@time_zone")[0][0]}; cdc renders with source tz ${jdbcTz}.")
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """
+ create table ${mysqlDb}.${table1} (
+ id int primary key,
+ tag varchar(32),
+ ts0 timestamp null,
+ dt0 datetime null
+ ) engine=innodb charset=utf8;
+ """
+
+ sql """SET SESSION time_zone = '${sourceTz}'"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1,
'snapshot_tokyo',
+ '2024-06-15 11:00:00', '2024-06-15 11:00:00')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=${jdbcTz}&forceConnectionTimeZoneToSession=true",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 1
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_desc """desc ${currentDb}.${table1};"""
+ qt_select_snapshot """select * from ${currentDb}.${table1} order by
id;"""
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """SET SESSION time_zone = '${sourceTz}'"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (2, 'binlog_tokyo',
+ '2024-06-15 11:00:00', '2024-06-15 11:00:00')"""
+ }
+
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ cnt.get(0).get(0) == 2
+ }
+ )
+
+ qt_select_binlog """select * from ${currentDb}.${table1} order by
id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name = '${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.groovy
new file mode 100644
index 00000000000..27e62a487c2
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.groovy
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Verify cdc tz handling when source session tz differs from cdc-client tz.
+//
+// MySQL TIMESTAMP stores a UTC instant; the session tz only affects how the
+// wall clock is parsed on write and rendered on read. DATETIME / DATE are
+// wall-clock verbatim, no tz transform.
+//
+// Coverage in one table:
+// * source session tz set to +08 / -05 / +00, same wall clock written ->
+// different UTC instants prove cdc honors source session tz (not a
+// hardcoded offset, not the cdc-client JVM tz).
+// * NULL row across every temporal column.
+// * MySQL TIMESTAMP epoch lower bound ('1970-01-01 00:00:01Z').
+// * Binlog path mirrors snapshot themes, plus an UPDATE that rewrites a
+// TIMESTAMP column under +08.
+//
+// jdbc_url uses serverTimezone=UTC so cdc renders TIMESTAMP back to UTC
+// wall clock regardless of the source session offset.
+suite("test_streaming_mysql_job_source_timezone",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_source_timezone_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_mysql_source_timezone"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """
+ create table ${mysqlDb}.${table1} (
+ id int primary key,
+ tag varchar(32),
+ ts0 timestamp null,
+ ts3 timestamp(3) null,
+ ts6 timestamp(6) null,
+ dt0 datetime null,
+ dt6 datetime(6) null,
+ d date null
+ ) engine=innodb charset=utf8;
+ """
+
+ // id=1: +08 baseline
+ sql """SET SESSION time_zone = '+08:00'"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1,
'snapshot_plus08',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123',
+ '2024-06-15 20:00:00.123456',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123456',
+ '2024-06-15')"""
+
+ // id=2: -05 -> same wall clock crosses to next UTC day (20:00 -05
= 01:00 next day UTC)
+ sql """SET SESSION time_zone = '-05:00'"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (2,
'snapshot_minus05',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123',
+ '2024-06-15 20:00:00.123456',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123456',
+ '2024-06-15')"""
+
+ // id=3: +00 -> wall clock == UTC instant
+ sql """SET SESSION time_zone = '+00:00'"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (3, 'snapshot_utc',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123',
+ '2024-06-15 20:00:00.123456',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123456',
+ '2024-06-15')"""
+
+ // id=4: NULL across every nullable column
+ sql """INSERT INTO ${mysqlDb}.${table1} (id, tag) VALUES (4,
'snapshot_null')"""
+
+ // id=5: epoch lower bound. MySQL TIMESTAMP min is '1970-01-01
00:00:01' UTC,
+ // so under +08 the smallest legal wall clock is '1970-01-01
08:00:01'.
+ sql """SET SESSION time_zone = '+08:00'"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (5,
'snapshot_epoch_plus08',
+ '1970-01-01 08:00:01',
+ '1970-01-01 08:00:01.123',
+ '1970-01-01 08:00:01.123456',
+ '1970-01-01 08:00:01',
+ '1970-01-01 08:00:01.123456',
+ '1970-01-01')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 5
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_desc """desc ${currentDb}.${table1};"""
+ qt_select_snapshot """select * from ${currentDb}.${table1} order by
id;"""
+
+ // Binlog phase: same tz themes through binlog, plus an UPDATE that
+ // rewrites ts0 on id=1 under +08 to confirm UPDATE follows the same
+ // tz codepath as INSERT.
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """SET SESSION time_zone = '+08:00'"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (101,
'binlog_plus08',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123',
+ '2024-06-15 20:00:00.123456',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123456',
+ '2024-06-15')"""
+
+ sql """SET SESSION time_zone = '-05:00'"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (102,
'binlog_minus05',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123',
+ '2024-06-15 20:00:00.123456',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123456',
+ '2024-06-15')"""
+
+ sql """SET SESSION time_zone = '+00:00'"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (103, 'binlog_utc',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123',
+ '2024-06-15 20:00:00.123456',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123456',
+ '2024-06-15')"""
+
+ sql """SET SESSION time_zone = '+08:00'"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES (105,
'binlog_epoch_plus08',
+ '1970-01-01 08:00:01',
+ '1970-01-01 08:00:01.123',
+ '1970-01-01 08:00:01.123456',
+ '1970-01-01 08:00:01',
+ '1970-01-01 08:00:01.123456',
+ '1970-01-01')"""
+
+ // UPDATE: id=1 was '20:00 +08' (UTC 12:00). Push wall clock to
'22:00 +08'
+ // (UTC 14:00) so we can poll for completion.
+ sql """UPDATE ${mysqlDb}.${table1} SET ts0 = '2024-06-15 22:00:00'
WHERE id = 1"""
+ }
+
+ // Wait for 4 binlog INSERTs + UPDATE on id=1 to settle.
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ if (cnt.get(0).get(0) != 9) return false
+ def updated = sql """select ts0 from
${currentDb}.${table1} where id = 1"""
+ return
updated.get(0).get(0).toString().startsWith('2024-06-15T14:00')
+ }
+ )
+
+ qt_select_binlog """select * from ${currentDb}.${table1} order by
id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name = '${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.groovy
new file mode 100644
index 00000000000..514ff72292d
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.groovy
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Cover TIMESTAMP chunk-key path. TIMESTAMP is the epoch+tz column type,
+// driver returns java.time.LocalDateTime by default; chunk-bound JSON
+// round-trip exercises the LocalDateTime branch in
+// AbstractCdcSourceReader.convertBound.
+//
+// Both source session and jdbc_url are pinned to UTC so the TIMESTAMP wall
+// clock written by INSERT equals the value cdc renders, and the .out can
+// be filled deterministically.
+suite("test_streaming_mysql_job_timestamp_pk",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_timestamp_pk_name"
+ def currentDb = (sql "select database()")[0][0]
+ def tableTs = "events_mysql_timestamp_pk"
+ def tableComposite = "events_mysql_timestamp_id_pk"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${tableTs} force"""
+ sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """SET SESSION time_zone = '+00:00'"""
+
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableTs}"""
+ // Explicit DEFAULT suppresses MySQL 5.7 implicit ON UPDATE
CURRENT_TIMESTAMP on the
+ // first TIMESTAMP column, so UPDATE does not auto-rewrite the
event_ts primary key.
+ sql """CREATE TABLE ${mysqlDb}.${tableTs} (
+ `event_ts` timestamp(6) NOT NULL DEFAULT '1970-01-01
00:00:01',
+ `payload` varchar(64),
+ PRIMARY KEY (`event_ts`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2024-01-01
00:00:00.000000', 'A1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2024-06-15
12:00:00.123456', 'B1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2025-01-01
00:00:00.000000', 'C1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2025-06-15
12:34:56.999999', 'D1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2026-01-01
00:00:00.000000', 'E1')"""
+
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableComposite}"""
+ sql """CREATE TABLE ${mysqlDb}.${tableComposite} (
+ `event_ts` timestamp(6) NOT NULL DEFAULT '1970-01-01
00:00:01',
+ `id` int NOT NULL,
+ `payload` varchar(64),
+ PRIMARY KEY (`event_ts`, `id`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES
('2024-02-01 00:00:00.000000', 1, 'A2')"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES
('2024-02-01 00:00:00.000000', 2, 'B2')"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES
('2024-02-02 12:00:00.500000', 3, 'C2')"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES
('2024-02-03 23:59:59.999999', 4, 'D2')"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES
('2024-02-04 00:00:00.000000', 5, 'E2')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${tableTs},${tableComposite}",
+ "offset" = "initial",
+ "snapshot_split_size" = "2"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def c1 = sql """select count(1) from
${currentDb}.${tableTs}"""
+ def c2 = sql """select count(1) from
${currentDb}.${tableComposite}"""
+ log.info("snapshot row count ts=${c1} composite=${c2}")
+ c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_select_snapshot_timestamp_pk """select event_ts, payload from
${currentDb}.${tableTs} order by event_ts asc"""
+ qt_select_snapshot_composite_pk """select event_ts, id, payload from
${currentDb}.${tableComposite} order by event_ts asc, id asc"""
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC") {
+ sql """SET SESSION time_zone = '+00:00'"""
+
+ sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2026-06-01
00:00:00.000000', 'F2')"""
+ sql """UPDATE ${mysqlDb}.${tableTs} SET payload='B2_upd' WHERE
event_ts='2024-06-15 12:00:00.123456'"""
+ sql """DELETE FROM ${mysqlDb}.${tableTs} WHERE
event_ts='2025-06-15 12:34:56.999999'"""
+
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES
('2024-02-05 00:00:00.000000', 6, 'F3')"""
+ sql """UPDATE ${mysqlDb}.${tableComposite} SET payload='C3_upd'
WHERE event_ts='2024-02-02 12:00:00.500000' AND id=3"""
+ sql """DELETE FROM ${mysqlDb}.${tableComposite} WHERE
event_ts='2024-02-03 23:59:59.999999' AND id=4"""
+ }
+
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def c1 = sql """select count(1) from
${currentDb}.${tableTs}"""
+ def c2 = sql """select count(1) from
${currentDb}.${tableComposite}"""
+ def upd1 = sql """select payload from
${currentDb}.${tableTs} where event_ts='2024-06-15 12:00:00.123456'"""
+ def upd2 = sql """select payload from
${currentDb}.${tableComposite} where event_ts='2024-02-02 12:00:00.500000' and
id=3"""
+ def del1 = sql """select count(1) from
${currentDb}.${tableTs} where event_ts='2025-06-15 12:34:56.999999'"""
+ def del2 = sql """select count(1) from
${currentDb}.${tableComposite} where event_ts='2024-02-03 23:59:59.999999' and
id=4"""
+ def p1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+ def p2 = upd2.size() == 0 ? null : upd2.get(0).get(0)
+ log.info("incr ts=${c1} composite=${c2} ts_upd=${p1}
comp_upd=${p2} ts_del=${del1} comp_del=${del2}")
+ c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 &&
+ p1 == 'B2_upd' && p2 == 'C3_upd' &&
+ del1.get(0).get(0) == 0 && del2.get(0).get(0)
== 0
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr_timestamp_pk """select event_ts, payload from
${currentDb}.${tableTs} order by event_ts asc"""
+ qt_select_after_incr_composite_pk """select event_ts, id, payload from
${currentDb}.${tableComposite} order by event_ts asc, id asc"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.groovy
new file mode 100644
index 00000000000..13b2e4c09ab
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.groovy
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// PG counterpart of test_streaming_mysql_job_jdbc_servertimezone.
+//
+// Recommended end-to-end tz configuration for data fidelity: set jdbc_url's
+// timezone to the SOURCE PG session/server tz, so cdc renders the timestamptz
+// instant back to the exact wall clock the source shows. Doris data then stays
+// identical to PG, independent of Doris's own session tz.
+//
+// Source tz is Asia/Tokyo (+09, no DST), deliberately != Doris default +08, so
+// the case proves the rendering follows the SOURCE tz (not Doris). The source
+// session uses the offset '+09:00' while jdbc_url uses the IANA name
+// 'Asia/Tokyo' (cdc resolves it via ZoneId).
+//
+// timetz is NOT rendered into the source tz: a time-of-day has no date, so a
+// named zone's DST offset cannot be resolved. Mirroring Debezium/PostgreSQL,
+// cdc keeps timetz UTC-normalized with its offset (e.g. '02:00Z').
+//
+// Setup (source tz = Asia/Tokyo = +09):
+// source SET TIME ZONE INTERVAL '+09:00' HOUR TO MINUTE, INSERT '2024-06-15
11:00:00'
+// ts (timestamp) -> literal '2024-06-15 11:00:00'
+// tstz (timestamptz) -> source-internal UTC instant 2024-06-15 02:00:00Z
+// ttz (timetz) -> source-internal UTC time 02:00:00Z
+// jdbc_url timezone aligned to the SOURCE tz (Asia/Tokyo)
+//
+// Expectation at Doris (independent of Doris session tz, since cdc renders
+// with the source tz, not Doris's; .out has no dependency on Doris tz):
+// ts -> '2024-06-15T11:00' (verbatim, no tz semantics)
+// tstz -> '2024-06-15T11:00' (02:00Z rendered with Asia/Tokyo +09 = 11:00,
== source)
+// ttz -> '02:00:00Z' (UTC-normalized time-of-day, kept as-is; not
zone-rendered)
+suite("test_streaming_postgres_job_jdbc_servertimezone",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_jdbc_servertimezone_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_pg_jdbc_servertimezone"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // jdbc timezone is aligned to the SOURCE db tz (not Doris) so Doris
+ // data matches the source wall clock. Log Doris tz only to show the
+ // result is independent of it.
+ def sourceTz = "+09:00"
+ def jdbcTz = "Asia/Tokyo"
+ log.info("Doris session time_zone = ${(sql "select
@@time_zone")[0][0]}; cdc renders with source tz ${jdbcTz}.")
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """
+ create table ${pgDB}.${pgSchema}.${table1} (
+ id integer PRIMARY KEY,
+ tag varchar(32),
+ ts timestamp,
+ tstz timestamp with time zone,
+ ttz time with time zone
+ );
+ """
+
+ sql """SET TIME ZONE INTERVAL '+09:00' HOUR TO MINUTE"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1,
'snapshot_tokyo',
+ '2024-06-15 11:00:00', '2024-06-15 11:00:00', '11:00:00')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=${jdbcTz}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 1
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_desc """desc ${currentDb}.${table1};"""
+ qt_select_snapshot """select * from ${currentDb}.${table1} order by
id;"""
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """SET TIME ZONE INTERVAL '+09:00' HOUR TO MINUTE"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,
'binlog_tokyo',
+ '2024-06-15 11:00:00', '2024-06-15 11:00:00', '11:00:00')"""
+ }
+
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ cnt.get(0).get(0) == 2
+ }
+ )
+
+ qt_select_binlog """select * from ${currentDb}.${table1} order by
id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name = '${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.groovy
new file mode 100644
index 00000000000..f9beddd3e00
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.groovy
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// PG counterpart of test_streaming_mysql_job_source_timezone.
+//
+// PG temporal semantics relevant to cdc tz handling:
+// timestamp - wall clock, no tz; debezium emits epoch-style schema, cdc
+// bypasses serverTimeZone.
+// timestamptz - normalized to UTC on write per session TimeZone; debezium
+// emits ZonedTimestamp ISO string, cdc renders it using
+// serverTimeZone.
+// timetz - time-of-day with offset. Debezium emits ZonedTime, a
+// UTC-normalized OffsetTime; cdc keeps it as-is (offset
+// preserved) rather than rendering into a named zone, since a
+// date-less time cannot resolve DST. Mirrors
Debezium/PostgreSQL.
+// date - no tz; literal day, must not drift across +08 boundary.
+//
+// Coverage:
+// * source session tz set to +08 / -05 / +00, same wall clock written ->
+// different UTC instants for timestamptz prove cdc honors source session.
+// * NULL row across every temporal column.
+// * epoch lower bound ('1970-01-01 00:00:01Z').
+// * Binlog path mirrors snapshot themes, plus an UPDATE on tstz0 under +08.
+//
+// jdbc_url uses timezone=UTC so cdc renders timestamptz back to UTC wall
+// clock regardless of the source session offset.
+suite("test_streaming_postgres_job_source_timezone",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_source_timezone_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_pg_source_timezone"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """
+ create table ${pgDB}.${pgSchema}.${table1} (
+ id integer PRIMARY KEY,
+ tag varchar(32),
+ ts timestamp,
+ tstz0 timestamptz(0),
+ tstz3 timestamptz(3),
+ tstz6 timestamptz(6),
+ ttz time with time zone,
+ d date
+ );
+ """
+
+ // INTERVAL form avoids depending on the container's tzdata.
+ // id=1: +08 baseline
+ sql """SET TIME ZONE INTERVAL '+08:00' HOUR TO MINUTE"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1,
'snapshot_plus08',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123',
+ '2024-06-15 20:00:00.123456',
+ '20:00:00.123456',
+ '2024-06-15')"""
+
+ // id=2: -05 -> same wall clock crosses to next UTC day for
timestamptz
+ sql """SET TIME ZONE INTERVAL '-05:00' HOUR TO MINUTE"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,
'snapshot_minus05',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123',
+ '2024-06-15 20:00:00.123456',
+ '20:00:00.123456',
+ '2024-06-15')"""
+
+ // id=3: +00 -> wall clock == UTC instant
+ sql """SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3,
'snapshot_utc',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123',
+ '2024-06-15 20:00:00.123456',
+ '20:00:00.123456',
+ '2024-06-15')"""
+
+ // id=4: NULL across every nullable column
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag) VALUES
(4, 'snapshot_null')"""
+
+ // id=5: epoch lower bound under +08
+ sql """SET TIME ZONE INTERVAL '+08:00' HOUR TO MINUTE"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (5,
'snapshot_epoch_plus08',
+ '1970-01-01 08:00:01',
+ '1970-01-01 08:00:01',
+ '1970-01-01 08:00:01.123',
+ '1970-01-01 08:00:01.123456',
+ '08:00:01.123456',
+ '1970-01-01')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 5
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_desc """desc ${currentDb}.${table1};"""
+ qt_select_snapshot """select * from ${currentDb}.${table1} order by
id;"""
+
+ // Binlog phase: same tz themes, plus an UPDATE that rewrites tstz0
+ // on id=1 under +08 to confirm UPDATE follows the same tz codepath.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """SET TIME ZONE INTERVAL '+08:00' HOUR TO MINUTE"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (101,
'binlog_plus08',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123',
+ '2024-06-15 20:00:00.123456',
+ '20:00:00.123456',
+ '2024-06-15')"""
+
+ sql """SET TIME ZONE INTERVAL '-05:00' HOUR TO MINUTE"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (102,
'binlog_minus05',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123',
+ '2024-06-15 20:00:00.123456',
+ '20:00:00.123456',
+ '2024-06-15')"""
+
+ sql """SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (103,
'binlog_utc',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00',
+ '2024-06-15 20:00:00.123',
+ '2024-06-15 20:00:00.123456',
+ '20:00:00.123456',
+ '2024-06-15')"""
+
+ sql """SET TIME ZONE INTERVAL '+08:00' HOUR TO MINUTE"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (105,
'binlog_epoch_plus08',
+ '1970-01-01 08:00:01',
+ '1970-01-01 08:00:01',
+ '1970-01-01 08:00:01.123',
+ '1970-01-01 08:00:01.123456',
+ '08:00:01.123456',
+ '1970-01-01')"""
+
+ // UPDATE: id=1 tstz0 was '20:00 +08' (UTC 12:00). Push wall clock
to
+ // '22:00 +08' (UTC 14:00) so we can poll for completion.
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET tstz0 =
'2024-06-15 22:00:00' WHERE id = 1"""
+ }
+
+ // Wait for 4 binlog INSERTs + UPDATE on id=1 to settle.
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ if (cnt.get(0).get(0) != 9) return false
+ def updated = sql """select tstz0 from
${currentDb}.${table1} where id = 1"""
+ return
updated.get(0).get(0).toString().startsWith('2024-06-15T14:00')
+ }
+ )
+
+ qt_select_binlog """select * from ${currentDb}.${table1} order by
id;"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name = '${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.groovy
new file mode 100644
index 00000000000..02939cc2307
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.groovy
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Cover PG timestamp / timestamptz chunk-key paths: timestamp (no tz,
+// driver returns LocalDateTime), timestamptz (driver returns
+// OffsetDateTime), plus a composite (timestamptz, id) PK to exercise
+// multi-column locating.
+//
+// Source SET TIME ZONE = UTC and jdbc_url timezone=UTC so the timestamptz
+// wall clock written by INSERT equals the value cdc renders, and the .out
+// can be filled deterministically.
+suite("test_streaming_postgres_job_timestamp_pk",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_timestamp_pk_name"
+ def currentDb = (sql "select database()")[0][0]
+ def tableTs = "events_pg_timestamp_pk"
+ def tableTstz = "events_pg_timestamptz_pk"
+ def tableComposite = "events_pg_timestamptz_id_pk"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${tableTs} force"""
+ sql """drop table if exists ${currentDb}.${tableTstz} force"""
+ sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE"""
+
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableTs}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableTs} (
+ event_ts timestamp(6) NOT NULL,
+ payload varchar(64),
+ PRIMARY KEY (event_ts)
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES
('2024-01-01 00:00:00.000000', 'A1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES
('2024-06-15 12:00:00.123456', 'B1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES
('2025-01-01 00:00:00.000000', 'C1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES
('2025-06-15 12:34:56.999999', 'D1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES
('2026-01-01 00:00:00.000000', 'E1')"""
+
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableTstz}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableTstz} (
+ event_ts timestamptz(6) NOT NULL,
+ payload varchar(64),
+ PRIMARY KEY (event_ts)
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES
('2024-01-01 00:00:00.000000', 'A1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES
('2024-06-15 12:00:00.123456', 'B1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES
('2025-01-01 00:00:00.000000', 'C1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES
('2025-06-15 12:34:56.999999', 'D1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES
('2026-01-01 00:00:00.000000', 'E1')"""
+
+ sql """DROP TABLE IF EXISTS
${pgDB}.${pgSchema}.${tableComposite}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableComposite} (
+ event_ts timestamptz(6) NOT NULL,
+ id integer NOT NULL,
+ payload varchar(64),
+ PRIMARY KEY (event_ts, id)
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES
('2024-02-01 00:00:00.000000', 1, 'A2')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES
('2024-02-01 00:00:00.000000', 2, 'B2')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES
('2024-02-02 12:00:00.500000', 3, 'C2')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES
('2024-02-03 23:59:59.999999', 4, 'D2')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES
('2024-02-04 00:00:00.000000', 5, 'E2')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" =
"${tableTs},${tableTstz},${tableComposite}",
+ "offset" = "initial",
+ "snapshot_split_size" = "2"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def c1 = sql """select count(1) from
${currentDb}.${tableTs}"""
+ def c2 = sql """select count(1) from
${currentDb}.${tableTstz}"""
+ def c3 = sql """select count(1) from
${currentDb}.${tableComposite}"""
+ log.info("snapshot row count ts=${c1} tstz=${c2}
composite=${c3}")
+ c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 &&
c3.get(0).get(0) == 5
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_select_snapshot_timestamp_pk """select event_ts, payload from
${currentDb}.${tableTs} order by event_ts asc"""
+ qt_select_snapshot_timestamptz_pk """select event_ts, payload from
${currentDb}.${tableTstz} order by event_ts asc"""
+ qt_select_snapshot_composite_pk """select event_ts, id, payload from
${currentDb}.${tableComposite} order by event_ts asc, id asc"""
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE"""
+
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES
('2026-06-01 00:00:00.000000', 'F2')"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${tableTs} SET payload='B2_upd'
WHERE event_ts='2024-06-15 12:00:00.123456'"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${tableTs} WHERE
event_ts='2025-06-15 12:34:56.999999'"""
+
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES
('2026-06-01 00:00:00.000000', 'F2')"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${tableTstz} SET
payload='B2_upd' WHERE event_ts='2024-06-15 12:00:00.123456'"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${tableTstz} WHERE
event_ts='2025-06-15 12:34:56.999999'"""
+
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES
('2024-02-05 00:00:00.000000', 6, 'F3')"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${tableComposite} SET
payload='C3_upd' WHERE event_ts='2024-02-02 12:00:00.500000' AND id=3"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${tableComposite} WHERE
event_ts='2024-02-03 23:59:59.999999' AND id=4"""
+ }
+
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def c1 = sql """select count(1) from
${currentDb}.${tableTs}"""
+ def c2 = sql """select count(1) from
${currentDb}.${tableTstz}"""
+ def c3 = sql """select count(1) from
${currentDb}.${tableComposite}"""
+ def upd1 = sql """select payload from
${currentDb}.${tableTs} where event_ts='2024-06-15 12:00:00.123456'"""
+ def upd2 = sql """select payload from
${currentDb}.${tableTstz} where event_ts='2024-06-15 12:00:00.123456'"""
+ def upd3 = sql """select payload from
${currentDb}.${tableComposite} where event_ts='2024-02-02 12:00:00.500000' and
id=3"""
+ def del1 = sql """select count(1) from
${currentDb}.${tableTs} where event_ts='2025-06-15 12:34:56.999999'"""
+ def del2 = sql """select count(1) from
${currentDb}.${tableTstz} where event_ts='2025-06-15 12:34:56.999999'"""
+ def del3 = sql """select count(1) from
${currentDb}.${tableComposite} where event_ts='2024-02-03 23:59:59.999999' and
id=4"""
+ def p1 = upd1.size() == 0 ? null : upd1.get(0).get(0)
+ def p2 = upd2.size() == 0 ? null : upd2.get(0).get(0)
+ def p3 = upd3.size() == 0 ? null : upd3.get(0).get(0)
+ log.info("incr ts=${c1} tstz=${c2} comp=${c3}
ts_upd=${p1} tstz_upd=${p2} comp_upd=${p3} dels=${del1}/${del2}/${del3}")
+ c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 &&
c3.get(0).get(0) == 5 &&
+ p1 == 'B2_upd' && p2 == 'B2_upd' && p3 ==
'C3_upd' &&
+ del1.get(0).get(0) == 0 && del2.get(0).get(0)
== 0 && del3.get(0).get(0) == 0
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ qt_select_after_incr_timestamp_pk """select event_ts, payload from
${currentDb}.${tableTs} order by event_ts asc"""
+ qt_select_after_incr_timestamptz_pk """select event_ts, payload from
${currentDb}.${tableTstz} order by event_ts asc"""
+ qt_select_after_incr_composite_pk """select event_ts, id, payload from
${currentDb}.${tableComposite} order by event_ts asc, id asc"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]