Copilot commented on code in PR #61826:
URL: https://github.com/apache/doris/pull/61826#discussion_r3008454384
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1016,14 +1031,25 @@ public void beforeCommitted(TransactionState txnState)
throws TransactionExcepti
}
LoadJob loadJob = loadJobs.get(0);
LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+
+ String offsetJson = offsetProvider.getCommitOffsetJson(
+ runningStreamTask.getRunningOffset(),
+ runningStreamTask.getTaskId(),
+ runningStreamTask.getScanBackendIds());
+
+
+ if (StringUtils.isBlank(offsetJson)) {
+ throw new TransactionException("Cannot find offset for
attachment, load job id is "
+ + runningStreamTask.getTaskId());
Review Comment:
The exception message says "load job id" but the value appended is
`runningStreamTask.getTaskId()`. Please clarify the wording (taskId vs
loadJobId) to avoid confusion when diagnosing commit failures.
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java:
##########
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.jdbc;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.split.AbstractSourceSplit;
+import org.apache.doris.job.cdc.split.BinlogSplit;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ * OffsetProvider for cdc_stream TVF path.
+ *
+ * <p>Differs from JdbcSourceOffsetProvider (non-TVF path) in:
+ * <ul>
+ * <li>offset commit: FE pulls actual end offset from BE via
/api/getTaskOffset/{taskId} in
+ * beforeCommitted, stores in txn attachment (transactionally safe)</li>
+ * <li>cloud mode snapshot: attachment carries cumulative
chunkHighWatermarkMap so that
+ * replayOnCloudMode can recover full state from the single latest
attachment in MS</li>
+ * <li>recovery: state is rebuilt from txn replay (chunkHighWatermarkMap
populated by
+ * replayOnCommitted/replayOnCloudMode -> updateOffset), not from
EditLog</li>
+ * <li>updateOffset: during replay remainingSplits is empty so removeIf
returns false naturally;
+ * chunkHighWatermarkMap is always updated unconditionally to support
recovery</li>
+ * <li>replayIfNeed: checks currentOffset directly — snapshot triggers
remainingSplits rebuild
+ * from meta + chunkHighWatermarkMap; binlog needs no action
(currentOffset already set)</li>
+ * </ul>
+ */
+@Log4j2
+public class JdbcTvfSourceOffsetProvider extends JdbcSourceOffsetProvider {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ /**
+ * No-arg constructor required by
SourceOffsetProviderFactory.createSourceOffsetProvider().
+ */
+ public JdbcTvfSourceOffsetProvider() {
+ super();
+ }
+
+ /**
+ * Initializes provider state and fetches snapshot splits from BE.
+ * splitChunks is called here (rather than in StreamingInsertJob) to keep
+ * all cdc_stream-specific init logic inside the provider.
+ */
+ @Override
+ public void ensureInitialized(Long jobId, Map<String, String>
originTvfProps) throws JobException {
+ if (this.jobId != null) {
+ return;
+ }
+ this.jobId = jobId;
+ this.sourceProperties = originTvfProps;
+ this.chunkHighWatermarkMap = new HashMap<>();
Review Comment:
ensureInitialized(...) returns early once jobId is set, so later calls
(e.g., after ALTER JOB updates the TVF SQL/properties) won’t refresh provider
fields like sourceProperties/snapshotParallelism/sourceType. This can leave
buildTableKey()/split selection operating on stale schema/table metadata.
Consider making this method idempotent but still update fields when
originTvfProps changes, and only guard truly one-time remote work in
initOnCreate().
```suggestion
*
* <p>Idempotent: jobId and watermark map are initialized once, while
* sourceProperties and other derived fields are refreshed on every call
* to reflect the latest TVF properties (e.g., after ALTER JOB).
*/
@Override
public void ensureInitialized(Long jobId, Map<String, String>
originTvfProps) throws JobException {
// One-time initialization tied to the lifecycle of this provider
instance.
if (this.jobId == null) {
this.jobId = jobId;
if (this.chunkHighWatermarkMap == null) {
this.chunkHighWatermarkMap = new HashMap<>();
}
}
// Always refresh fields derived from the current TVF properties so
that
// later calls (e.g., after ALTER JOB) do not operate on stale
metadata.
this.sourceProperties = originTvfProps;
```
##########
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.groovy:
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test streaming INSERT job using cdc_stream TVF for PostgreSQL.
+ *
+ * Scenario:
+ * 1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced.
+ * 2. Binlog phase: INSERT (C1, D1) are applied.
+ */
+suite("test_streaming_job_cdc_stream_postgres",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_job_cdc_stream_postgres_name"
+ def currentDb = (sql "select database()")[0][0]
+ def dorisTable = "test_streaming_job_cdc_stream_postgres_tbl"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def pgTable = "test_streaming_job_cdc_stream_postgres_src"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+ `name` varchar(200) NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`name`)
+ DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // prepare source table with pre-existing snapshot data
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+ "name" varchar(200) PRIMARY KEY,
+ "age" int2
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('A1', 1)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('B1', 2)"""
+ }
+
+ // create streaming job via cdc_stream TVF (offset=initial → snapshot
then binlog)
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+ SELECT name, age FROM cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "table" = "${pgTable}",
+ "offset" = "initial"
+ )
+ """
+
+ // wait for at least one snapshot task to succeed
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
+ log.info("SucceedTaskCount: " + cnt)
+ cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 2
Review Comment:
Waiting for `SucceedTaskCount >= 2` is brittle here because the number of
successful tasks depends on how many snapshot splits are generated/scheduled
(and could legitimately be 1 for small tables). For better test stability, wait
on an invariant signal (e.g., target table contains the snapshot rows, or job
status reaches RUNNING with at least 1 successful task).
```suggestion
// wait for snapshot data (A1, B1) to be fully synced into the
target table
try {
Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('A1', 'B1')"""
log.info("snapshot rows: " + rows)
(rows.get(0).get(0) as int) == 2
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]