Copilot commented on code in PR #61826:
URL: https://github.com/apache/doris/pull/61826#discussion_r3008454384


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1016,14 +1031,25 @@ public void beforeCommitted(TransactionState txnState) 
throws TransactionExcepti
             }
             LoadJob loadJob = loadJobs.get(0);
             LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+
+            String offsetJson = offsetProvider.getCommitOffsetJson(
+                    runningStreamTask.getRunningOffset(),
+                    runningStreamTask.getTaskId(),
+                    runningStreamTask.getScanBackendIds());
+
+
+            if (StringUtils.isBlank(offsetJson)) {
+                throw new TransactionException("Cannot find offset for 
attachment, load job id is "
+                        + runningStreamTask.getTaskId());

Review Comment:
   The exception message says "load job id" but the value appended is 
`runningStreamTask.getTaskId()`. Please clarify the wording (taskId vs 
loadJobId) to avoid confusion when diagnosing commit failures.



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java:
##########
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.jdbc;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.split.AbstractSourceSplit;
+import org.apache.doris.job.cdc.split.BinlogSplit;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ * OffsetProvider for cdc_stream TVF path.
+ *
+ * <p>Differs from JdbcSourceOffsetProvider (non-TVF path) in:
+ * <ul>
+ *   <li>offset commit: FE pulls actual end offset from BE via 
/api/getTaskOffset/{taskId} in
+ *       beforeCommitted, stores in txn attachment (transactionally safe)</li>
+ *   <li>cloud mode snapshot: attachment carries cumulative 
chunkHighWatermarkMap so that
+ *       replayOnCloudMode can recover full state from the single latest 
attachment in MS</li>
+ *   <li>recovery: state is rebuilt from txn replay (chunkHighWatermarkMap 
populated by
+ *       replayOnCommitted/replayOnCloudMode -> updateOffset), not from 
EditLog</li>
+ *   <li>updateOffset: during replay remainingSplits is empty so removeIf 
returns false naturally;
+ *       chunkHighWatermarkMap is always updated unconditionally to support 
recovery</li>
+ *   <li>replayIfNeed: checks currentOffset directly — snapshot triggers 
remainingSplits rebuild
+ *       from meta + chunkHighWatermarkMap; binlog needs no action 
(currentOffset already set)</li>
+ * </ul>
+ */
+@Log4j2
+public class JdbcTvfSourceOffsetProvider extends JdbcSourceOffsetProvider {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    /**
+     * No-arg constructor required by 
SourceOffsetProviderFactory.createSourceOffsetProvider().
+     */
+    public JdbcTvfSourceOffsetProvider() {
+        super();
+    }
+
+    /**
+     * Initializes provider state and fetches snapshot splits from BE.
+     * splitChunks is called here (rather than in StreamingInsertJob) to keep
+     * all cdc_stream-specific init logic inside the provider.
+     */
+    @Override
+    public void ensureInitialized(Long jobId, Map<String, String> 
originTvfProps) throws JobException {
+        if (this.jobId != null) {
+            return;
+        }
+        this.jobId = jobId;
+        this.sourceProperties = originTvfProps;
+        this.chunkHighWatermarkMap = new HashMap<>();

Review Comment:
   ensureInitialized(...) returns early once jobId is set, so later calls 
(e.g., after ALTER JOB updates the TVF SQL/properties) won’t refresh provider 
fields like sourceProperties/snapshotParallelism/sourceType. This can leave 
buildTableKey()/split selection operating on stale schema/table metadata. 
Consider making this method idempotent but still update fields when 
originTvfProps changes, and only guard truly one-time remote work in 
initOnCreate().
   ```suggestion
        *
        * <p>Idempotent: jobId and watermark map are initialized once, while
        * sourceProperties and other derived fields are refreshed on every call
        * to reflect the latest TVF properties (e.g., after ALTER JOB).
        */
       @Override
       public void ensureInitialized(Long jobId, Map<String, String> 
originTvfProps) throws JobException {
           // One-time initialization tied to the lifecycle of this provider 
instance.
           if (this.jobId == null) {
               this.jobId = jobId;
               if (this.chunkHighWatermarkMap == null) {
                   this.chunkHighWatermarkMap = new HashMap<>();
               }
           }
   
           // Always refresh fields derived from the current TVF properties so 
that
           // later calls (e.g., after ALTER JOB) do not operate on stale 
metadata.
           this.sourceProperties = originTvfProps;
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.groovy:
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test streaming INSERT job using cdc_stream TVF for PostgreSQL.
+ *
+ * Scenario:
+ *   1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced.
+ *   2. Binlog phase: INSERT (C1, D1) are applied.
+ */
+suite("test_streaming_job_cdc_stream_postgres", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_job_cdc_stream_postgres_name"
+    def currentDb = (sql "select database()")[0][0]
+    def dorisTable = "test_streaming_job_cdc_stream_postgres_tbl"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+    def pgTable = "test_streaming_job_cdc_stream_postgres_src"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+            `name` varchar(200) NULL,
+            `age`  int NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`name`)
+        DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+    """
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // prepare source table with pre-existing snapshot data
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+                      "name" varchar(200) PRIMARY KEY,
+                      "age"  int2
+                  )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('A1', 1)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('B1', 2)"""
+        }
+
+        // create streaming job via cdc_stream TVF (offset=initial → snapshot 
then binlog)
+        sql """
+            CREATE JOB ${jobName}
+            ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+            SELECT name, age FROM cdc_stream(
+                "type"         = "postgres",
+                "jdbc_url"     = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                "driver_url"   = "${driver_url}",
+                "driver_class" = "org.postgresql.Driver",
+                "user"         = "${pgUser}",
+                "password"     = "${pgPassword}",
+                "database"     = "${pgDB}",
+                "schema"             = "${pgSchema}",
+                "table"              = "${pgTable}",
+                "offset"             = "initial"
+            )
+        """
+
+        // wait for at least one snapshot task to succeed
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
+                log.info("SucceedTaskCount: " + cnt)
+                cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 2

Review Comment:
   Waiting for `SucceedTaskCount >= 2` is brittle here because the number of 
successful tasks depends on how many snapshot splits are generated/scheduled 
(and could legitimately be 1 for small tables). For better test stability, wait 
on an invariant signal (e.g., target table contains the snapshot rows, or job 
status reaches RUNNING with at least 1 successful task).
   ```suggestion
           // wait for snapshot data (A1, B1) to be fully synced into the 
target table
           try {
               Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
                   def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('A1', 'B1')"""
                   log.info("snapshot rows: " + rows)
                   (rows.get(0).get(0) as int) == 2
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to