Copilot commented on code in PR #63219:
URL: https://github.com/apache/doris/pull/63219#discussion_r3238605169
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java:
##########
@@ -63,6 +65,49 @@ public abstract class AbstractCdcSourceReader implements
SourceReader {
protected SourceRecordDeserializer<SourceRecord, DeserializeResult>
serializer;
protected Map<TableId, TableChanges.TableChange> tableSchemas;
+ private final Map<String, Class<?>> splitKeyClassCache = new
ConcurrentHashMap<>();
+
+ protected abstract Class<?> probeSplitKeyClass(
+ TableId tableId, Column splitColumn, JobBaseConfig jobConfig);
+
+ protected Class<?> resolveSplitKeyClass(
+ TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+ String key = tableId.identifier() + "." + splitColumn.name();
+ return splitKeyClassCache.computeIfAbsent(
+ key, k -> probeSplitKeyClass(tableId, splitColumn, jobConfig));
+ }
+
+ protected static Object[] convertBounds(Object[] raw, Class<?> target,
ObjectMapper mapper) {
+ if (raw == null) {
+ return null;
+ }
+ Object[] out = new Object[raw.length];
+ for (int i = 0; i < raw.length; i++) {
+ out[i] = convertBound(raw[i], target, mapper);
+ }
+ return out;
+ }
+
+ private static Object convertBound(Object v, Class<?> target, ObjectMapper
mapper) {
+ if (v == null) {
+ return null;
+ }
+ if (target.isInstance(v)) {
+ return v;
+ }
+ String s = v.toString();
+ if (target == java.sql.Date.class) {
+ return java.sql.Date.valueOf(s);
+ }
+ if (target == java.sql.Timestamp.class) {
+ return java.sql.Timestamp.valueOf(s);
+ }
+ if (target == java.sql.Time.class) {
+ return java.sql.Time.valueOf(s);
+ }
+ return mapper.convertValue(v, target);
Review Comment:
`convertBound` only supports converting `java.sql.Date`/`Timestamp`/`Time`
from values whose `toString()` matches the `Date.valueOf`/`Timestamp.valueOf`
grammar (`yyyy-[m]m-[d]d[ hh:mm:ss[.fffffffff]]`). The unit tests cover only
String values in that exact format. If FE Jackson ever serializes a
`java.sql.Timestamp` as either epoch milliseconds (Jackson's default with
`WRITE_DATES_AS_TIMESTAMPS=true`) or an ISO‑8601 string with a `T`
separator/zone offset (when that feature is disabled or a custom serializer is
in use), the resulting `v.toString()` will not match `Timestamp.valueOf` and
this method will throw `IllegalArgumentException`, breaking the very round-trip
this fix targets. Consider routing through `mapper.convertValue` (or an
explicit `Instant`/`LocalDateTime` parse) when the string doesn't match the
JDBC literal grammar, and add a unit test that feeds the actual FE-serialized
form for Timestamp (and Date as a Long, if applicable) to lock the contract
down.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java:
##########
@@ -63,6 +65,49 @@ public abstract class AbstractCdcSourceReader implements
SourceReader {
protected SourceRecordDeserializer<SourceRecord, DeserializeResult>
serializer;
protected Map<TableId, TableChanges.TableChange> tableSchemas;
+ private final Map<String, Class<?>> splitKeyClassCache = new
ConcurrentHashMap<>();
+
+ protected abstract Class<?> probeSplitKeyClass(
+ TableId tableId, Column splitColumn, JobBaseConfig jobConfig);
+
+ protected Class<?> resolveSplitKeyClass(
+ TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+ String key = tableId.identifier() + "." + splitColumn.name();
+ return splitKeyClassCache.computeIfAbsent(
+ key, k -> probeSplitKeyClass(tableId, splitColumn, jobConfig));
+ }
Review Comment:
`splitKeyClassCache` is keyed only by `tableId.identifier() + "." +
splitColumn.name()` and never invalidated. Two concerns: (1) the same
`AbstractCdcSourceReader` instance, if reused across jobs pointing at different
data sources that happen to share `schema.table.column` names (a realistic
scenario for tenanted/staging vs production), will silently return the class
probed from the first job's JDBC URL — leading to the same kind of "wrong
restored type" bug this fix is intended to prevent. (2) After a DDL that alters
the split key column's type, the stale class will keep being used for the
lifetime of the reader. Consider including a datasource identifier (e.g.,
jdbc_url) in the cache key, and/or invalidating the entry on schema-change
events.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -614,16 +614,25 @@ protected abstract Fetcher<SourceRecords,
SourceSplitBase> getBinlogSplitReader(
createSnapshotSplit(Map<String, Object> offset, JobBaseConfig
jobConfig) {
SnapshotSplit snapshotSplit = objectMapper.convertValue(offset,
SnapshotSplit.class);
TableId tableId = TableId.parse(snapshotSplit.getTableId(), false);
- Object[] splitStart = snapshotSplit.getSplitStart();
- Object[] splitEnd = snapshotSplit.getSplitEnd();
List<String> splitKeys = snapshotSplit.getSplitKey();
Map<TableId, TableChanges.TableChange> tableSchemas =
getTableSchemas(jobConfig);
TableChanges.TableChange tableChange = tableSchemas.get(tableId);
Preconditions.checkNotNull(
tableChange, "Can not find table " + tableId + " in job " +
jobConfig.getJobId());
// only support one split key
String splitKey = splitKeys.get(0);
- io.debezium.relational.Column splitColumn =
tableChange.getTable().columnWithName(splitKey);
+ Column splitColumn = tableChange.getTable().columnWithName(splitKey);
+ Preconditions.checkNotNull(
+ splitColumn,
+ "Split key column "
+ + splitKey
+ + " not found in table "
+ + tableId
+ + " for job "
+ + jobConfig.getJobId());
+ Class<?> keyClass = resolveSplitKeyClass(tableId, splitColumn,
jobConfig);
+ Object[] splitStart = convertBounds(snapshotSplit.getSplitStart(),
keyClass, objectMapper);
+ Object[] splitEnd = convertBounds(snapshotSplit.getSplitEnd(),
keyClass, objectMapper);
Review Comment:
Five identical `Preconditions.checkNotNull(splitColumn, "Split key column
…")` / `Preconditions.checkNotNull(tableChange, …)` blocks were added in four
call sites across `JdbcIncrementalSourceReader.createSnapshotSplit`,
`JdbcIncrementalSourceReader.createStreamSplit`,
`MySqlSourceReader.createSnapshotSplit`, and
`MySqlSourceReader.createBinlogSplit`. Consider extracting a small helper on
`AbstractCdcSourceReader` (e.g. `resolveSplitColumn(TableId, String splitKey,
Map<...> schemas, JobBaseConfig)`) that performs both null checks plus the
`resolveSplitKeyClass` lookup and returns the column + class together. This
would shrink the diff, keep the error messages consistent if they ever need to
evolve, and remove the risk of one of the four copies drifting from the others.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -366,6 +369,26 @@ protected DataType fromDbzColumn(Column splitColumn) {
return PostgresTypeUtils.fromDbzColumn(splitColumn);
}
+ @Override
+ protected Class<?> probeSplitKeyClass(
+ TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+ PostgresSourceConfig sourceConfig = getSourceConfig(jobConfig);
+ String sql =
+ String.format(
+ "SELECT %s FROM %s WHERE 1=0",
+ PostgresQueryUtils.quote(splitColumn.name()),
+ PostgresQueryUtils.quote(tableId));
+ try (JdbcConnection jdbc =
+ new
PostgresDialect(sourceConfig).openJdbcConnection(sourceConfig);
+ Statement st = jdbc.connection().createStatement();
+ ResultSet rs = st.executeQuery(sql)) {
+ return Class.forName(rs.getMetaData().getColumnClassName(1));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Probe split key class failed for " + tableId + "." +
splitColumn.name(), e);
+ }
+ }
Review Comment:
`probeSplitKeyClass` opens a brand-new JDBC connection on every cache miss
(one per `table.column`). For a job with many tables this can multiply quickly,
and any transient JDBC failure (network blip, momentary lock, brief auth issue)
immediately throws `RuntimeException` and fails the whole snapshot-split
creation with no retry — by design per the PR description, but worth
confirming. Consider at least one retry with backoff inside
`probeSplitKeyClass` to avoid making well-known transient failure modes (e.g.
PG's `cannot execute ... in a read-only transaction` during failover) bubble up
as terminal job failures. The same comment applies to the analogous override in
`MySqlSourceReader.probeSplitKeyClass`.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -670,12 +680,29 @@ private Tuple2<SourceSplitBase, Boolean>
createStreamSplit(
if (maxOffsetFinishSplits == null ||
sourceOffset.isAfter(maxOffsetFinishSplits)) {
maxOffsetFinishSplits = sourceOffset;
}
+ TableId tid = TableId.parse(split.getTableId());
Review Comment:
Inconsistent use of `TableId.parse`:
`JdbcIncrementalSourceReader.createSnapshotSplit` (line 616) calls
`TableId.parse(snapshotSplit.getTableId(), false)` (forces lowercase), while
the new code in `createStreamSplit` (line 683) and `MySqlSourceReader` paths
call the single-arg `TableId.parse(split.getTableId())` (preserves case). Since
`resolveSplitKeyClass` keys its cache by `tableId.identifier()`, an
upper-case-quoted source identifier could produce two different cache keys for
the same table across snapshot-vs-stream split creation, doubling the probe and
(more importantly) hiding a potential bug if the two paths disagree on
identifier normalization. Please align the parse mode across all four sites.
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy:
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_date_pk",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_date_pk_name"
+ def currentDb = (sql "select database()")[0][0]
+ def tableDate = "events_pg_date_pk"
+ def tableComposite = "events_pg_date_id_pk"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${tableDate} force"""
+ sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableDate}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableDate} (
+ "event_date" date PRIMARY KEY,
+ "payload" varchar(200)
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date,
payload) VALUES ('2025-01-01', 'A1');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date,
payload) VALUES ('2025-01-02', 'B1');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date,
payload) VALUES ('2025-01-03', 'C1');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date,
payload) VALUES ('2025-01-04', 'D1');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date,
payload) VALUES ('2025-01-05', 'E1');"""
+
+ sql """DROP TABLE IF EXISTS
${pgDB}.${pgSchema}.${tableComposite}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableComposite} (
+ "event_date" date NOT NULL,
+ "id" int4 NOT NULL,
+ "payload" varchar(200),
+ PRIMARY KEY ("event_date", "id")
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite}
(event_date, id, payload) VALUES ('2025-02-01', 1, 'A2');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite}
(event_date, id, payload) VALUES ('2025-02-02', 2, 'B2');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite}
(event_date, id, payload) VALUES ('2025-02-03', 3, 'C2');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite}
(event_date, id, payload) VALUES ('2025-02-04', 4, 'D2');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite}
(event_date, id, payload) VALUES ('2025-02-05', 5, 'E2');"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${tableDate},${tableComposite}",
+ "offset" = "initial",
+ "snapshot_split_size" = "1",
+ "snapshot_parallelism" = "2"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ jobSuccendCount.size() == 1 && '5' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_select_snapshot_date_pk """ SELECT * FROM ${tableDate} order by
event_date asc """
+ qt_select_snapshot_composite_pk """ SELECT * FROM ${tableComposite}
order by event_date asc, id asc """
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date,
payload) VALUES ('2025-01-06', 'F1');"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${tableDate} SET payload =
'B1_upd' WHERE event_date = '2025-01-02';"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${tableDate} WHERE
event_date = '2025-01-01';"""
+
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite}
(event_date, id, payload) VALUES ('2025-02-06', 6, 'F2');"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${tableComposite} SET payload =
'B2_upd' WHERE event_date = '2025-02-02' AND id = 2;"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${tableComposite} WHERE
event_date = '2025-02-01' AND id = 1;"""
+ }
+
+ sleep(60000)
+
Review Comment:
A 60-second hard `sleep` after issuing CDC DML and before asserting the
`qt_select_binlog_*` snapshots is brittle: on a slow CI host the binlog/stream
phase may not yet have propagated all events, producing flaky failures. The
earlier `Awaitility.await()` pattern used for the snapshot phase scales much
better; consider polling on the expected row count (or job state) instead of a
fixed sleep. The same applies to the analogous test in
`test_streaming_mysql_job_date_pk.groovy`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]