Copilot commented on code in PR #63219:
URL: https://github.com/apache/doris/pull/63219#discussion_r3238605169


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java:
##########
@@ -63,6 +65,49 @@ public abstract class AbstractCdcSourceReader implements 
SourceReader {
     protected SourceRecordDeserializer<SourceRecord, DeserializeResult> 
serializer;
     protected Map<TableId, TableChanges.TableChange> tableSchemas;
 
+    private final Map<String, Class<?>> splitKeyClassCache = new 
ConcurrentHashMap<>();
+
+    protected abstract Class<?> probeSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig);
+
+    protected Class<?> resolveSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+        String key = tableId.identifier() + "." + splitColumn.name();
+        return splitKeyClassCache.computeIfAbsent(
+                key, k -> probeSplitKeyClass(tableId, splitColumn, jobConfig));
+    }
+
+    protected static Object[] convertBounds(Object[] raw, Class<?> target, 
ObjectMapper mapper) {
+        if (raw == null) {
+            return null;
+        }
+        Object[] out = new Object[raw.length];
+        for (int i = 0; i < raw.length; i++) {
+            out[i] = convertBound(raw[i], target, mapper);
+        }
+        return out;
+    }
+
+    private static Object convertBound(Object v, Class<?> target, ObjectMapper 
mapper) {
+        if (v == null) {
+            return null;
+        }
+        if (target.isInstance(v)) {
+            return v;
+        }
+        String s = v.toString();
+        if (target == java.sql.Date.class) {
+            return java.sql.Date.valueOf(s);
+        }
+        if (target == java.sql.Timestamp.class) {
+            return java.sql.Timestamp.valueOf(s);
+        }
+        if (target == java.sql.Time.class) {
+            return java.sql.Time.valueOf(s);
+        }
+        return mapper.convertValue(v, target);

Review Comment:
   `convertBound` only supports converting `java.sql.Date`/`Timestamp`/`Time` 
from values whose `toString()` matches the `Date.valueOf`/`Timestamp.valueOf` 
grammar (`yyyy-[m]m-[d]d[ hh:mm:ss[.fffffffff]]`). The unit tests cover only 
String values in that exact format. If FE Jackson ever serializes a 
`java.sql.Timestamp` as either epoch milliseconds (Jackson's default with 
`WRITE_DATES_AS_TIMESTAMPS=true`) or an ISO‑8601 string with a `T` 
separator/zone offset (when that feature is disabled or a custom serializer is 
in use), the resulting `v.toString()` will not match `Timestamp.valueOf` and 
this method will throw `IllegalArgumentException`, breaking the very round-trip 
this fix targets. Consider routing through `mapper.convertValue` (or an 
explicit `Instant`/`LocalDateTime` parse) when the string doesn't match the 
JDBC literal grammar, and add a unit test that feeds the actual FE-serialized 
form for Timestamp (and Date as a Long, if applicable) to lock the contract 
down.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java:
##########
@@ -63,6 +65,49 @@ public abstract class AbstractCdcSourceReader implements 
SourceReader {
     protected SourceRecordDeserializer<SourceRecord, DeserializeResult> 
serializer;
     protected Map<TableId, TableChanges.TableChange> tableSchemas;
 
+    private final Map<String, Class<?>> splitKeyClassCache = new 
ConcurrentHashMap<>();
+
+    protected abstract Class<?> probeSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig);
+
+    protected Class<?> resolveSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+        String key = tableId.identifier() + "." + splitColumn.name();
+        return splitKeyClassCache.computeIfAbsent(
+                key, k -> probeSplitKeyClass(tableId, splitColumn, jobConfig));
+    }

Review Comment:
   `splitKeyClassCache` is keyed only by `tableId.identifier() + "." + 
splitColumn.name()` and never invalidated. Two concerns: (1) the same 
`AbstractCdcSourceReader` instance, if reused across jobs pointing at different 
data sources that happen to share `schema.table.column` names (a realistic 
scenario for tenanted/staging vs production), will silently return the class 
probed from the first job's JDBC URL — leading to the same kind of "wrong 
restored type" bug this fix is intended to prevent. (2) After a DDL that alters 
the split key column's type, the stale class will keep being used for the 
lifetime of the reader. Consider including a datasource identifier (e.g., 
jdbc_url) in the cache key, and/or invalidating the entry on schema-change 
events.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -614,16 +614,25 @@ protected abstract Fetcher<SourceRecords, 
SourceSplitBase> getBinlogSplitReader(
             createSnapshotSplit(Map<String, Object> offset, JobBaseConfig 
jobConfig) {
         SnapshotSplit snapshotSplit = objectMapper.convertValue(offset, 
SnapshotSplit.class);
         TableId tableId = TableId.parse(snapshotSplit.getTableId(), false);
-        Object[] splitStart = snapshotSplit.getSplitStart();
-        Object[] splitEnd = snapshotSplit.getSplitEnd();
         List<String> splitKeys = snapshotSplit.getSplitKey();
         Map<TableId, TableChanges.TableChange> tableSchemas = 
getTableSchemas(jobConfig);
         TableChanges.TableChange tableChange = tableSchemas.get(tableId);
         Preconditions.checkNotNull(
                 tableChange, "Can not find table " + tableId + " in job " + 
jobConfig.getJobId());
         // only support one split key
         String splitKey = splitKeys.get(0);
-        io.debezium.relational.Column splitColumn = 
tableChange.getTable().columnWithName(splitKey);
+        Column splitColumn = tableChange.getTable().columnWithName(splitKey);
+        Preconditions.checkNotNull(
+                splitColumn,
+                "Split key column "
+                        + splitKey
+                        + " not found in table "
+                        + tableId
+                        + " for job "
+                        + jobConfig.getJobId());
+        Class<?> keyClass = resolveSplitKeyClass(tableId, splitColumn, 
jobConfig);
+        Object[] splitStart = convertBounds(snapshotSplit.getSplitStart(), 
keyClass, objectMapper);
+        Object[] splitEnd = convertBounds(snapshotSplit.getSplitEnd(), 
keyClass, objectMapper);

Review Comment:
   Five identical `Preconditions.checkNotNull(splitColumn, "Split key column 
…")` / `Preconditions.checkNotNull(tableChange, …)` blocks were added in four 
call sites across `JdbcIncrementalSourceReader.createSnapshotSplit`, 
`JdbcIncrementalSourceReader.createStreamSplit`, 
`MySqlSourceReader.createSnapshotSplit`, and 
`MySqlSourceReader.createBinlogSplit`. Consider extracting a small helper on 
`AbstractCdcSourceReader` (e.g. `resolveSplitColumn(TableId, String splitKey, 
Map<...> schemas, JobBaseConfig)`) that performs both null checks plus the 
`resolveSplitKeyClass` lookup and returns the column + class together. This 
would shrink the diff, keep the error messages consistent if they ever need to 
evolve, and remove the risk of one of the four copies drifting from the others.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -366,6 +369,26 @@ protected DataType fromDbzColumn(Column splitColumn) {
         return PostgresTypeUtils.fromDbzColumn(splitColumn);
     }
 
+    @Override
+    protected Class<?> probeSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+        PostgresSourceConfig sourceConfig = getSourceConfig(jobConfig);
+        String sql =
+                String.format(
+                        "SELECT %s FROM %s WHERE 1=0",
+                        PostgresQueryUtils.quote(splitColumn.name()),
+                        PostgresQueryUtils.quote(tableId));
+        try (JdbcConnection jdbc =
+                        new 
PostgresDialect(sourceConfig).openJdbcConnection(sourceConfig);
+                Statement st = jdbc.connection().createStatement();
+                ResultSet rs = st.executeQuery(sql)) {
+            return Class.forName(rs.getMetaData().getColumnClassName(1));
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Probe split key class failed for " + tableId + "." + 
splitColumn.name(), e);
+        }
+    }

Review Comment:
   `probeSplitKeyClass` opens a brand-new JDBC connection on every cache miss 
(one per `table.column`). For a job with many tables this can multiply quickly, 
and any transient JDBC failure (network blip, momentary lock, brief auth issue) 
immediately throws `RuntimeException` and fails the whole snapshot-split 
creation with no retry — by design per the PR description, but worth 
confirming. Consider at least one retry with backoff inside 
`probeSplitKeyClass` to avoid making well-known transient failure modes (e.g. 
PG's `cannot execute ... in a read-only transaction` during failover) bubble up 
as terminal job failures. The same comment applies to the analogous override in 
`MySqlSourceReader.probeSplitKeyClass`.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -670,12 +680,29 @@ private Tuple2<SourceSplitBase, Boolean> 
createStreamSplit(
                 if (maxOffsetFinishSplits == null || 
sourceOffset.isAfter(maxOffsetFinishSplits)) {
                     maxOffsetFinishSplits = sourceOffset;
                 }
+                TableId tid = TableId.parse(split.getTableId());

Review Comment:
   Inconsistent use of `TableId.parse`: 
`JdbcIncrementalSourceReader.createSnapshotSplit` (line 616) calls 
`TableId.parse(snapshotSplit.getTableId(), false)` (forces lowercase), while 
the new code in `createStreamSplit` (line 683) and `MySqlSourceReader` paths 
call the single-arg `TableId.parse(split.getTableId())` (preserves case). Since 
`resolveSplitKeyClass` keys its cache by `tableId.identifier()`, an 
upper-case-quoted source identifier could produce two different cache keys for 
the same table across snapshot-vs-stream split creation, doubling the probe and 
(more importantly) hiding a potential bug if the two paths disagree on 
identifier normalization. Please align the parse mode across all four sites.
   



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy:
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_date_pk", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_date_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def tableDate = "events_pg_date_pk"
+    def tableComposite = "events_pg_date_id_pk"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${tableDate} force"""
+    sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableDate}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableDate} (
+                  "event_date" date PRIMARY KEY,
+                  "payload" varchar(200)
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-01', 'A1');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-02', 'B1');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-03', 'C1');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-04', 'D1');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-05', 'E1');"""
+
+            sql """DROP TABLE IF EXISTS 
${pgDB}.${pgSchema}.${tableComposite}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableComposite} (
+                  "event_date" date NOT NULL,
+                  "id" int4 NOT NULL,
+                  "payload" varchar(200),
+                  PRIMARY KEY ("event_date", "id")
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-01', 1, 'A2');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-02', 2, 'B2');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-03', 3, 'C2');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-04', 4, 'D2');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-05', 5, 'E2');"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${tableDate},${tableComposite}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "1",
+                    "snapshot_parallelism" = "2"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                        log.info("jobSuccendCount: " + jobSuccendCount)
+                        jobSuccendCount.size() == 1 && '5' <= 
jobSuccendCount.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_select_snapshot_date_pk """ SELECT * FROM ${tableDate} order by 
event_date asc """
+        qt_select_snapshot_composite_pk """ SELECT * FROM ${tableComposite} 
order by event_date asc, id asc """
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-06', 'F1');"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${tableDate} SET payload = 
'B1_upd' WHERE event_date = '2025-01-02';"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${tableDate} WHERE 
event_date = '2025-01-01';"""
+
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-06', 6, 'F2');"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${tableComposite} SET payload = 
'B2_upd' WHERE event_date = '2025-02-02' AND id = 2;"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${tableComposite} WHERE 
event_date = '2025-02-01' AND id = 1;"""
+        }
+
+        sleep(60000)
+

Review Comment:
   A 60-second hard `sleep` after issuing CDC DML and before asserting the 
`qt_select_binlog_*` snapshots is brittle: on a slow CI host the binlog/stream 
phase may not yet have propagated all events, producing flaky failures. The 
earlier `Awaitility.await()` pattern used for the snapshot phase scales much 
better; consider polling on the expected row count (or job state) instead of a 
fixed sleep. The same applies to the analogous test in 
`test_streaming_mysql_job_date_pk.groovy`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to