github-actions[bot] commented on code in PR #61826:
URL: https://github.com/apache/doris/pull/61826#discussion_r3007796579


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1016,14 +1031,24 @@ public void beforeCommitted(TransactionState txnState) 
throws TransactionExcepti
             }
             LoadJob loadJob = loadJobs.get(0);
             LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+
+            String offsetJson = offsetProvider.getCommitOffsetJson(
+                    runningStreamTask.getRunningOffset(),

Review Comment:
   **[High] RPC under write lock.** `getCommitOffsetJson()` is called while the 
job's write lock is held (acquired at `writeLock()` above). For 
`JdbcTvfSourceOffsetProvider`, this calls `fetchTaskEndOffset()` which performs 
an RPC to BE via `BackendServiceProxy.requestCdcClient()`. If the BE is slow, 
unreachable, or the RPC times out, this will block all other threads waiting 
for the job's write lock.
   
   Note: the pre-existing `shouldReleaseLock = false` bug means this lock is 
never released in this method, making the issue even worse — the lock will be 
permanently held by the txn commit thread.
   
   Consider moving the RPC call **before** acquiring the write lock, or 
restructuring `beforeCommitted()` to only hold the lock for the attachment 
assignment.



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java:
##########
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.jdbc;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.split.AbstractSourceSplit;
+import org.apache.doris.job.cdc.split.BinlogSplit;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ * OffsetProvider for cdc_stream TVF path.
+ *
+ * <p>Differs from JdbcSourceOffsetProvider (non-TVF path) in:
+ * <ul>
+ *   <li>offset commit: FE pulls actual end offset from BE via 
/api/getTaskOffset/{taskId} in
+ *       beforeCommitted, stores in txn attachment (transactionally safe)</li>
+ *   <li>cloud mode snapshot: attachment carries cumulative 
chunkHighWatermarkMap so that
+ *       replayOnCloudMode can recover full state from the single latest 
attachment in MS</li>
+ *   <li>recovery: state is rebuilt from txn replay (chunkHighWatermarkMap 
populated by
+ *       replayOnCommitted/replayOnCloudMode -> updateOffset), not from 
EditLog</li>
+ *   <li>updateOffset: during replay remainingSplits is empty so removeIf 
returns false naturally;
+ *       chunkHighWatermarkMap is always updated unconditionally to support 
recovery</li>
+ *   <li>replayIfNeed: checks currentOffset directly — snapshot triggers 
remainingSplits rebuild
+ *       from meta + chunkHighWatermarkMap; binlog needs no action 
(currentOffset already set)</li>
+ * </ul>
+ */
+@Log4j2
+public class JdbcTvfSourceOffsetProvider extends JdbcSourceOffsetProvider {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    /**
+     * No-arg constructor required by 
SourceOffsetProviderFactory.createSourceOffsetProvider().
+     */
+    public JdbcTvfSourceOffsetProvider() {
+        super();
+    }
+
+    /**
+     * Initializes provider state and fetches snapshot splits from BE.
+     * splitChunks is called here (rather than in StreamingInsertJob) to keep
+     * all cdc_stream-specific init logic inside the provider.
+     */
+    @Override
+    public void ensureInitialized(Long jobId, Map<String, String> 
originTvfProps) throws JobException {
+        if (this.jobId != null) {
+            return;

Review Comment:
   **[Medium] Non-thread-safe initialization guard.** `this.jobId` is not 
`volatile`, and there is no synchronization around this check-then-act pattern. 
While concurrent calls may be unlikely in practice, this violates Java Memory 
Model visibility guarantees — a thread could see a stale `null` for `jobId` 
even after another thread set it.
   
   Consider either:
   1. Making `jobId` volatile, or
   2. Using `synchronized` around the initialization block, or
   3. Using `AtomicReference<Long>` with `compareAndSet`.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1016,14 +1031,24 @@ public void beforeCommitted(TransactionState txnState) 
throws TransactionExcepti
             }
             LoadJob loadJob = loadJobs.get(0);
             LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+
+            String offsetJson = offsetProvider.getCommitOffsetJson(
+                    runningStreamTask.getRunningOffset(),
+                    runningStreamTask.getTaskId(),
+                    runningStreamTask.getScanBackendIds());
+
+
+            if (StringUtils.isBlank(offsetJson)) {
+                throw new TransactionException("Can not found offset for 
attachment, load job id is " + runningStreamTask.getTaskId());
+            }

Review Comment:
   **[Nit] Grammar.** "Can not found offset" should be "Cannot find offset" or 
"Offset not found".



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java:
##########
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.jdbc;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.split.AbstractSourceSplit;
+import org.apache.doris.job.cdc.split.BinlogSplit;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ * OffsetProvider for cdc_stream TVF path.
+ *
+ * <p>Differs from JdbcSourceOffsetProvider (non-TVF path) in:
+ * <ul>
+ *   <li>offset commit: FE pulls actual end offset from BE via 
/api/getTaskOffset/{taskId} in
+ *       beforeCommitted, stores in txn attachment (transactionally safe)</li>
+ *   <li>cloud mode snapshot: attachment carries cumulative 
chunkHighWatermarkMap so that
+ *       replayOnCloudMode can recover full state from the single latest 
attachment in MS</li>
+ *   <li>recovery: state is rebuilt from txn replay (chunkHighWatermarkMap 
populated by
+ *       replayOnCommitted/replayOnCloudMode -> updateOffset), not from 
EditLog</li>
+ *   <li>updateOffset: during replay remainingSplits is empty so removeIf 
returns false naturally;
+ *       chunkHighWatermarkMap is always updated unconditionally to support 
recovery</li>
+ *   <li>replayIfNeed: checks currentOffset directly — snapshot triggers 
remainingSplits rebuild
+ *       from meta + chunkHighWatermarkMap; binlog needs no action 
(currentOffset already set)</li>
+ * </ul>
+ */
+@Log4j2
+public class JdbcTvfSourceOffsetProvider extends JdbcSourceOffsetProvider {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    /**
+     * No-arg constructor required by 
SourceOffsetProviderFactory.createSourceOffsetProvider().
+     */
+    public JdbcTvfSourceOffsetProvider() {
+        super();
+    }
+
+    /**
+     * Initializes provider state and fetches snapshot splits from BE.
+     * splitChunks is called here (rather than in StreamingInsertJob) to keep
+     * all cdc_stream-specific init logic inside the provider.
+     */
+    @Override
+    public void ensureInitialized(Long jobId, Map<String, String> 
originTvfProps) throws JobException {
+        if (this.jobId != null) {
+            return;
+        }
+        this.jobId = jobId;
+        this.sourceProperties = originTvfProps;
+        this.chunkHighWatermarkMap = new HashMap<>();
+        String type = originTvfProps.get(DataSourceConfigKeys.TYPE);
+        Preconditions.checkArgument(type != null, "type is required");
+        this.sourceType = DataSourceType.valueOf(type.toUpperCase());
+        this.snapshotParallelism = Integer.parseInt(
+                
originTvfProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+                        DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
+        String table = originTvfProps.get(DataSourceConfigKeys.TABLE);
+        Preconditions.checkArgument(table != null, "table is required for 
cdc_stream TVF");
+    }
+
+    /**
+     * Called once on fresh job creation (not on FE restart).
+     * Fetches snapshot splits from BE and persists them to the meta table.
+     */
+    @Override
+    public void initOnCreate() throws JobException {
+        String table = sourceProperties.get(DataSourceConfigKeys.TABLE);
+        splitChunks(Collections.singletonList(table));
+    }
+
+    /**
+     * Rewrites the cdc_stream TVF SQL with current offset meta and taskId,
+     * so the BE knows where to start reading and can report
+     * the end offset back via taskOffsetCache.
+     */
+    @Override
+    public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand, Offset runningOffset, long taskId) {
+        JdbcOffset offset = (JdbcOffset) runningOffset;
+        Map<String, String> props = new HashMap<>();
+        Plan rewritePlan = originCommand.getParsedPlan().get().rewriteUp(plan 
-> {
+            if (plan instanceof UnboundTVFRelation) {
+                UnboundTVFRelation originTvfRel = (UnboundTVFRelation) plan;
+                props.putAll(originTvfRel.getProperties().getMap());
+                props.put(CdcStreamTableValuedFunction.META_KEY, new 
Gson().toJson(offset.generateMeta()));
+                props.put(CdcStreamTableValuedFunction.JOB_ID_KEY, 
String.valueOf(jobId));
+                props.put(CdcStreamTableValuedFunction.TASK_ID_KEY, 
String.valueOf(taskId));
+                return new UnboundTVFRelation(
+                        originTvfRel.getRelationId(), 
originTvfRel.getFunctionName(), new Properties(props));
+            }
+            return plan;
+        });
+        InsertIntoTableCommand cmd = new InsertIntoTableCommand((LogicalPlan) 
rewritePlan,
+                Optional.empty(), Optional.empty(), Optional.empty(), true, 
Optional.empty());
+        cmd.setJobId(originCommand.getJobId());
+        return cmd;
+    }
+
+    /**
+     * Returns the serialized JSON offset to store in txn commit attachment.
+     *
+     * <p>Calls BE /api/getTaskOffset/{taskId} to get the actual end offset 
recorded after
+     * fetchRecordStream completes (stored in 
PipelineCoordinator.taskOffsetCache).
+     *
+     * <p>For cloud + snapshot: returns cumulative list (all previously 
completed chunks +
+     * current task's new splits) so that replayOnCloudMode can recover full 
state from latest attachment.
+     * For non-cloud snapshot / binlog: returns only current task's splits.
+     */
+    @Override
+    public String getCommitOffsetJson(Offset runningOffset, long taskId, 
List<Long> scanBackendIds) {
+        List<Map<String, String>> currentTaskEndOffset = 
fetchTaskEndOffset(taskId, scanBackendIds);
+        if (CollectionUtils.isEmpty(currentTaskEndOffset)) {
+            return "";
+        }
+
+        // Cloud + snapshot: prepend all previously completed chunks so the 
attachment is
+        // self-contained for replayOnCloudMode (MS only keeps the latest 
attachment)
+        if (Config.isCloudMode() && ((JdbcOffset) 
runningOffset).snapshotSplit()) {
+            List<Map<String, String>> cumulative = 
buildCumulativeSnapshotOffset(currentTaskEndOffset);
+            return new Gson().toJson(cumulative);
+        }
+        return new Gson().toJson(currentTaskEndOffset);
+    }
+
+    /**
+     * Queries each scan backend in order until one returns a non-empty offset 
for this taskId.
+     * Only the BE that ran the cdc_stream TVF scan node will have the entry 
in taskOffsetCache.
+     */
+    private List<Map<String, String>> fetchTaskEndOffset(long taskId, 
List<Long> scanBackendIds) {
+        InternalService.PRequestCdcClientRequest request =
+                InternalService.PRequestCdcClientRequest.newBuilder()
+                        .setApi("/api/getTaskOffset/" + taskId).build();
+        for (Long beId : scanBackendIds) {
+            Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
+            if (backend == null) {
+                log.info("Backend {} not found for task {}, skipping", beId, 
taskId);
+                continue;
+            }
+            String rawResponse = null;
+            try {
+                TNetworkAddress address = new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+                Future<PRequestCdcClientResult> future =
+                        
BackendServiceProxy.getInstance().requestCdcClient(address, request);
+                InternalService.PRequestCdcClientResult result = future.get();
+                TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+                if (code != TStatusCode.OK) {
+                    log.warn("Failed to get task {} offset from BE {}: {}", 
taskId,
+                            backend.getHost(), 
result.getStatus().getErrorMsgs(0));
+                    continue;
+                }
+                rawResponse = result.getResponse();
+                ResponseBody<List<Map<String, String>>> responseObj = 
OBJECT_MAPPER.readValue(
+                        rawResponse,
+                        new TypeReference<ResponseBody<List<Map<String, 
String>>>>() {});
+                List<Map<String, String>> data = responseObj.getData();
+                if (!CollectionUtils.isEmpty(data)) {
+                    log.info("Fetched task {} offset from BE {}: {}", taskId, 
backend.getHost(), data);
+                    return data;
+                }
+            } catch (Exception ex) {
+                log.warn("Get task offset error for task {} from BE {}, raw 
response: {}",
+                        taskId, backend.getHost(), rawResponse, ex);
+                throw new RuntimeException(ex);
+            }

Review Comment:
   **[High] Unchecked RuntimeException in transaction path.** 
`fetchTaskEndOffset` wraps all exceptions in `RuntimeException` and throws. 
This is called from `getCommitOffsetJson()` which is called from 
`beforeCommitted()` (declared `throws TransactionException`). The 
`RuntimeException` will propagate unchecked through the transaction commit path 
and may not be handled correctly by the transaction framework, potentially 
leaving the transaction in an inconsistent state.
   
   Consider either:
   1. Returning `Collections.emptyList()` on error (like the `code != 
TStatusCode.OK` branch does), or
   2. Converting to a checked exception that `getCommitOffsetJson` can declare.



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java:
##########
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.jdbc;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.split.AbstractSourceSplit;
+import org.apache.doris.job.cdc.split.BinlogSplit;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ * OffsetProvider for cdc_stream TVF path.
+ *
+ * <p>Differs from JdbcSourceOffsetProvider (non-TVF path) in:
+ * <ul>
+ *   <li>offset commit: FE pulls actual end offset from BE via 
/api/getTaskOffset/{taskId} in
+ *       beforeCommitted, stores in txn attachment (transactionally safe)</li>
+ *   <li>cloud mode snapshot: attachment carries cumulative 
chunkHighWatermarkMap so that
+ *       replayOnCloudMode can recover full state from the single latest 
attachment in MS</li>
+ *   <li>recovery: state is rebuilt from txn replay (chunkHighWatermarkMap 
populated by
+ *       replayOnCommitted/replayOnCloudMode -> updateOffset), not from 
EditLog</li>
+ *   <li>updateOffset: during replay remainingSplits is empty so removeIf 
returns false naturally;
+ *       chunkHighWatermarkMap is always updated unconditionally to support 
recovery</li>
+ *   <li>replayIfNeed: checks currentOffset directly — snapshot triggers 
remainingSplits rebuild
+ *       from meta + chunkHighWatermarkMap; binlog needs no action 
(currentOffset already set)</li>
+ * </ul>
+ */
+@Log4j2
+public class JdbcTvfSourceOffsetProvider extends JdbcSourceOffsetProvider {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    /**
+     * No-arg constructor required by 
SourceOffsetProviderFactory.createSourceOffsetProvider().
+     */
+    public JdbcTvfSourceOffsetProvider() {
+        super();
+    }
+
+    /**
+     * Initializes provider state and fetches snapshot splits from BE.
+     * splitChunks is called here (rather than in StreamingInsertJob) to keep
+     * all cdc_stream-specific init logic inside the provider.
+     */
+    @Override
+    public void ensureInitialized(Long jobId, Map<String, String> 
originTvfProps) throws JobException {
+        if (this.jobId != null) {
+            return;
+        }
+        this.jobId = jobId;
+        this.sourceProperties = originTvfProps;
+        this.chunkHighWatermarkMap = new HashMap<>();
+        String type = originTvfProps.get(DataSourceConfigKeys.TYPE);
+        Preconditions.checkArgument(type != null, "type is required");
+        this.sourceType = DataSourceType.valueOf(type.toUpperCase());
+        this.snapshotParallelism = Integer.parseInt(
+                
originTvfProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+                        DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
+        String table = originTvfProps.get(DataSourceConfigKeys.TABLE);
+        Preconditions.checkArgument(table != null, "table is required for 
cdc_stream TVF");
+    }
+
+    /**
+     * Called once on fresh job creation (not on FE restart).
+     * Fetches snapshot splits from BE and persists them to the meta table.
+     */
+    @Override
+    public void initOnCreate() throws JobException {
+        String table = sourceProperties.get(DataSourceConfigKeys.TABLE);
+        splitChunks(Collections.singletonList(table));
+    }
+
+    /**
+     * Rewrites the cdc_stream TVF SQL with current offset meta and taskId,
+     * so the BE knows where to start reading and can report
+     * the end offset back via taskOffsetCache.
+     */
+    @Override
+    public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand, Offset runningOffset, long taskId) {
+        JdbcOffset offset = (JdbcOffset) runningOffset;
+        Map<String, String> props = new HashMap<>();
+        Plan rewritePlan = originCommand.getParsedPlan().get().rewriteUp(plan 
-> {
+            if (plan instanceof UnboundTVFRelation) {
+                UnboundTVFRelation originTvfRel = (UnboundTVFRelation) plan;
+                props.putAll(originTvfRel.getProperties().getMap());
+                props.put(CdcStreamTableValuedFunction.META_KEY, new 
Gson().toJson(offset.generateMeta()));
+                props.put(CdcStreamTableValuedFunction.JOB_ID_KEY, 
String.valueOf(jobId));
+                props.put(CdcStreamTableValuedFunction.TASK_ID_KEY, 
String.valueOf(taskId));
+                return new UnboundTVFRelation(
+                        originTvfRel.getRelationId(), 
originTvfRel.getFunctionName(), new Properties(props));
+            }
+            return plan;
+        });
+        InsertIntoTableCommand cmd = new InsertIntoTableCommand((LogicalPlan) 
rewritePlan,
+                Optional.empty(), Optional.empty(), Optional.empty(), true, 
Optional.empty());
+        cmd.setJobId(originCommand.getJobId());
+        return cmd;
+    }
+
+    /**
+     * Returns the serialized JSON offset to store in txn commit attachment.
+     *
+     * <p>Calls BE /api/getTaskOffset/{taskId} to get the actual end offset 
recorded after
+     * fetchRecordStream completes (stored in 
PipelineCoordinator.taskOffsetCache).
+     *
+     * <p>For cloud + snapshot: returns cumulative list (all previously 
completed chunks +
+     * current task's new splits) so that replayOnCloudMode can recover full 
state from latest attachment.
+     * For non-cloud snapshot / binlog: returns only current task's splits.
+     */
+    @Override
+    public String getCommitOffsetJson(Offset runningOffset, long taskId, 
List<Long> scanBackendIds) {
+        List<Map<String, String>> currentTaskEndOffset = 
fetchTaskEndOffset(taskId, scanBackendIds);
+        if (CollectionUtils.isEmpty(currentTaskEndOffset)) {
+            return "";
+        }
+
+        // Cloud + snapshot: prepend all previously completed chunks so the 
attachment is
+        // self-contained for replayOnCloudMode (MS only keeps the latest 
attachment)
+        if (Config.isCloudMode() && ((JdbcOffset) 
runningOffset).snapshotSplit()) {
+            List<Map<String, String>> cumulative = 
buildCumulativeSnapshotOffset(currentTaskEndOffset);
+            return new Gson().toJson(cumulative);
+        }
+        return new Gson().toJson(currentTaskEndOffset);
+    }
+
+    /**
+     * Queries each scan backend in order until one returns a non-empty offset 
for this taskId.
+     * Only the BE that ran the cdc_stream TVF scan node will have the entry 
in taskOffsetCache.
+     */
+    private List<Map<String, String>> fetchTaskEndOffset(long taskId, 
List<Long> scanBackendIds) {
+        InternalService.PRequestCdcClientRequest request =
+                InternalService.PRequestCdcClientRequest.newBuilder()
+                        .setApi("/api/getTaskOffset/" + taskId).build();
+        for (Long beId : scanBackendIds) {
+            Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
+            if (backend == null) {
+                log.info("Backend {} not found for task {}, skipping", beId, 
taskId);
+                continue;
+            }
+            String rawResponse = null;
+            try {
+                TNetworkAddress address = new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+                Future<PRequestCdcClientResult> future =
+                        
BackendServiceProxy.getInstance().requestCdcClient(address, request);
+                InternalService.PRequestCdcClientResult result = future.get();
+                TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+                if (code != TStatusCode.OK) {
+                    log.warn("Failed to get task {} offset from BE {}: {}", 
taskId,
+                            backend.getHost(), 
result.getStatus().getErrorMsgs(0));
+                    continue;
+                }
+                rawResponse = result.getResponse();
+                ResponseBody<List<Map<String, String>>> responseObj = 
OBJECT_MAPPER.readValue(
+                        rawResponse,
+                        new TypeReference<ResponseBody<List<Map<String, 
String>>>>() {});
+                List<Map<String, String>> data = responseObj.getData();
+                if (!CollectionUtils.isEmpty(data)) {
+                    log.info("Fetched task {} offset from BE {}: {}", taskId, 
backend.getHost(), data);
+                    return data;
+                }
+            } catch (Exception ex) {
+                log.warn("Get task offset error for task {} from BE {}, raw 
response: {}",
+                        taskId, backend.getHost(), rawResponse, ex);
+                throw new RuntimeException(ex);
+            }
+        }
+        return Collections.emptyList();
+    }
+
+    /**
+     * Merges existing chunkHighWatermarkMap (all previously completed chunks) 
with
+     * current task's new splits, deduplicating by splitId.
+     */
+    private List<Map<String, String>> buildCumulativeSnapshotOffset(
+            List<Map<String, String>> currentTaskSplits) {
+        Set<String> currentSplitIds = currentTaskSplits.stream()
+                .map(m -> m.get(SPLIT_ID)).collect(Collectors.toSet());
+
+        List<Map<String, String>> result = new ArrayList<>();
+        // Add all previously completed chunks (skip any that overlap with 
current task)
+        if (MapUtils.isNotEmpty(chunkHighWatermarkMap)) {
+            for (Map.Entry<String, Map<String, Map<String, String>>> tableEntry
+                    : chunkHighWatermarkMap.entrySet()) {
+                for (Map.Entry<String, Map<String, String>> splitEntry
+                        : tableEntry.getValue().entrySet()) {
+                    if (!currentSplitIds.contains(splitEntry.getKey())) {
+                        Map<String, String> map = new 
HashMap<>(splitEntry.getValue());
+                        map.put(SPLIT_ID, splitEntry.getKey());
+                        result.add(map);
+                    }
+                }
+            }
+        }
+        result.addAll(currentTaskSplits);
+        return result;
+    }
+
+    /**
+     * TVF path updateOffset.
+     *
+     * <p>Snapshot: always writes to chunkHighWatermarkMap (needed for cloud 
cumulative attachment
+     * and FE-restart recovery). In normal flow removeIf finds the split in 
remainingSplits and
+     * adds it to finishedSplits. During txn replay remainingSplits is empty 
so removeIf returns
+     * false naturally — chunkHighWatermarkMap is still updated for 
replayIfNeed to use later.
+     *
+     * <p>Binlog: currentOffset is set above; no extra state needed for TVF 
recovery path.
+     */
+    @Override
+    public void updateOffset(Offset offset) {
+        this.currentOffset = (JdbcOffset) offset;
+        if (currentOffset.snapshotSplit()) {
+            for (AbstractSourceSplit split : currentOffset.getSplits()) {
+                SnapshotSplit ss = (SnapshotSplit) split;
+                boolean removed = remainingSplits.removeIf(v -> {
+                    if (v.getSplitId().equals(ss.getSplitId())) {
+                        ss.setTableId(v.getTableId());
+                        ss.setSplitKey(v.getSplitKey());
+                        ss.setSplitStart(v.getSplitStart());
+                        ss.setSplitEnd(v.getSplitEnd());
+                        return true;
+                    }
+                    return false;
+                });
+                if (removed) {
+                    finishedSplits.add(ss);
+                }
+                chunkHighWatermarkMap.computeIfAbsent(buildTableKey(), k -> 
new HashMap<>())
+                        .put(ss.getSplitId(), ss.getHighWatermark());
+            }
+        }
+        // Binlog: currentOffset is already set; no binlogOffsetPersist needed 
for TVF path.
+    }
+
+    /**
+     * TVF path recovery: offsetProviderPersist is always null (no EditLog 
write).
+     * currentOffset is set by replayOnCommitted/replayOnCloudMode -> 
updateOffset before this runs.
+     *
+     * <ul>
+     *   <li>snapshot: mid-snapshot restart — rebuild remainingSplits from 
meta + chunkHighWatermarkMap</li>
+     *   <li>binlog: currentOffset already correct from updateOffset; nothing 
to do</li>
+     * </ul>
+     */
+    @Override
+    public void replayIfNeed(StreamingInsertJob job) throws JobException {
+        if (currentOffset == null) {
+            log.info("Replaying TVF offset provider for job {}: no committed 
txn, skip", getJobId());
+            return;
+        }
+        if (currentOffset.snapshotSplit()) {
+            log.info("Replaying TVF offset provider for job {}: restoring 
snapshot state from txn replay",
+                    getJobId());
+            Map<String, List<SnapshotSplit>> snapshotSplits = 
StreamingJobUtils.restoreSplitsToJob(job.getJobId());
+            if (MapUtils.isNotEmpty(snapshotSplits)) {
+                List<SnapshotSplit> lastSnapshotSplits =
+                        recalculateRemainingSplits(chunkHighWatermarkMap, 
snapshotSplits);
+                if (remainingSplits.isEmpty()) {
+                    if (!lastSnapshotSplits.isEmpty()) {
+                        currentOffset = new JdbcOffset(lastSnapshotSplits);
+                    } else if (!isSnapshotOnlyMode()) {
+                        BinlogSplit binlogSplit = new BinlogSplit();
+                        binlogSplit.setFinishedSplits(finishedSplits);
+                        currentOffset = new 
JdbcOffset(Collections.singletonList(binlogSplit));
+                    }
+                }
+            }
+        } else {
+            log.info("Replaying TVF offset provider for job {}: binlog offset 
already set, nothing to do",
+                    getJobId());
+        }
+    }
+
+    /**
+     * Builds the chunkHighWatermarkMap outer key as schema.table (if schema 
is present)
+     * or database.table, matching the format used by snapshotSplits keys in
+     * recalculateRemainingSplits.
+     */
+    private String buildTableKey() {
+        String schema = sourceProperties.get(DataSourceConfigKeys.SCHEMA);
+        String qualifier = (schema != null && !schema.isEmpty())
+                ? schema : sourceProperties.get(DataSourceConfigKeys.DATABASE);
+        return qualifier + "." + 
sourceProperties.get(DataSourceConfigKeys.TABLE);
+    }
+
+    /**
+     * TVF path does not persist to EditLog; state is recovered via txn replay.
+     * This override is defensive — the persistOffsetProviderIfNeed() call path
+     * only runs in the non-TVF commitOffset flow and won't reach here.
+     */
+    @Override
+    public String getPersistInfo() {
+        return null;
+    }
+
+    @Override
+    public void applyEndOffsetToTask(Offset runningOffset, Offset endOffset) {
+        if (!(runningOffset instanceof JdbcOffset) || !(endOffset instanceof 
JdbcOffset)) {
+            return;
+        }
+        JdbcOffset running = (JdbcOffset) runningOffset;
+        JdbcOffset end = (JdbcOffset) endOffset;
+        if (running.snapshotSplit()) {
+            for (int i = 0; i < running.getSplits().size() && i < 
end.getSplits().size(); i++) {
+                SnapshotSplit rSplit = (SnapshotSplit) 
running.getSplits().get(i);
+                SnapshotSplit eSplit = (SnapshotSplit) end.getSplits().get(i);
+                rSplit.setHighWatermark(eSplit.getHighWatermark());
+            }
+        } else {
+            BinlogSplit rSplit = (BinlogSplit) running.getSplits().get(0);
+            BinlogSplit eSplit = (BinlogSplit) end.getSplits().get(0);

Review Comment:
   **[Medium] Potential IndexOutOfBoundsException.** 
`running.getSplits().get(0)` and `end.getSplits().get(0)` will throw 
`IndexOutOfBoundsException` if the splits list is empty. The snapshot branch 
correctly uses `size()` bounds, but the binlog branch does not check for empty 
lists. Consider adding a guard:
   ```java
   if (running.getSplits().isEmpty() || end.getSplits().isEmpty()) {
       return;
   }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -51,24 +62,34 @@ private void processProps(Map<String, String> properties) 
throws AnalysisExcepti
         Map<String, String> copyProps = new HashMap<>(properties);
         copyProps.put("format", "json");
         super.parseCommonProperties(copyProps);
-        this.processedParams.put("enable_cdc_client", "true");
-        this.processedParams.put("uri", URI);
-        this.processedParams.put("http.enable.range.request", "false");
-        this.processedParams.put("http.enable.chunk.response", "true");
-        this.processedParams.put("http.method", "POST");
+        this.processedParams.put(ENABLE_CDC_CLIENT_KEY, "true");
+        this.processedParams.put(URI_KEY, URI);
+        this.processedParams.put(HTTP_ENABLE_RANGE_REQUEST_KEY, "false");
+        this.processedParams.put(HTTP_ENABLE_CHUNK_RESPONSE_KEY, "true");
+        this.processedParams.put(HTTP_METHOD_KEY, "POST");
 
         String payload = generateParams(properties);
-        this.processedParams.put("http.payload", payload);
+        this.processedParams.put(HTTP_PAYLOAD_KEY, payload);
         this.backendConnectProperties.putAll(processedParams);
         generateFileStatus();
     }
 
     private String generateParams(Map<String, String> properties) throws 
AnalysisException {
         FetchRecordRequest recordRequest = new FetchRecordRequest();
-        recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+        String defaultJobId = UUID.randomUUID().toString().replace("-", "");
+        recordRequest.setJobId(properties.getOrDefault(JOB_ID_KEY, 
defaultJobId));
         recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
         recordRequest.setConfig(properties);
         try {
+            // for tvf with job
+            if (properties.containsKey(TASK_ID_KEY)) {
+                recordRequest.setTaskId(properties.remove(TASK_ID_KEY));
+                String meta = properties.remove(META_KEY);

Review Comment:
   **[Medium] Mutating input map via `properties.remove()`.** `properties` is 
the original map passed into the constructor. `remove(TASK_ID_KEY)` and 
`remove(META_KEY)` below mutate this map in-place. Since 
`recordRequest.setConfig(properties)` was called above (line 82), the config 
reference shares the same map — so these keys are also removed from the request 
config. This appears intentional (to avoid sending them to cdc_client), but:
   
   1. The caller's map is also mutated as a side effect
   2. This is fragile — if `setConfig` ever copies the map, the removal won't 
affect the config
   
   Consider using a copy of the map for the config, and explicitly removing 
these keys from the copy instead.



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java:
##########
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.jdbc;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.split.AbstractSourceSplit;
+import org.apache.doris.job.cdc.split.BinlogSplit;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ * OffsetProvider for cdc_stream TVF path.
+ *
+ * <p>Differs from JdbcSourceOffsetProvider (non-TVF path) in:
+ * <ul>
+ *   <li>offset commit: FE pulls actual end offset from BE via 
/api/getTaskOffset/{taskId} in
+ *       beforeCommitted, stores in txn attachment (transactionally safe)</li>
+ *   <li>cloud mode snapshot: attachment carries cumulative 
chunkHighWatermarkMap so that
+ *       replayOnCloudMode can recover full state from the single latest 
attachment in MS</li>
+ *   <li>recovery: state is rebuilt from txn replay (chunkHighWatermarkMap 
populated by
+ *       replayOnCommitted/replayOnCloudMode -> updateOffset), not from 
EditLog</li>
+ *   <li>updateOffset: during replay remainingSplits is empty so removeIf 
returns false naturally;
+ *       chunkHighWatermarkMap is always updated unconditionally to support 
recovery</li>
+ *   <li>replayIfNeed: checks currentOffset directly — snapshot triggers 
remainingSplits rebuild
+ *       from meta + chunkHighWatermarkMap; binlog needs no action 
(currentOffset already set)</li>
+ * </ul>
+ */
+@Log4j2
+public class JdbcTvfSourceOffsetProvider extends JdbcSourceOffsetProvider {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    /**
+     * No-arg constructor required by 
SourceOffsetProviderFactory.createSourceOffsetProvider().
+     */
+    public JdbcTvfSourceOffsetProvider() {
+        super();
+    }
+
+    /**
+     * Initializes provider state and fetches snapshot splits from BE.
+     * splitChunks is called here (rather than in StreamingInsertJob) to keep
+     * all cdc_stream-specific init logic inside the provider.
+     */
+    @Override
+    public void ensureInitialized(Long jobId, Map<String, String> 
originTvfProps) throws JobException {
+        if (this.jobId != null) {
+            return;
+        }
+        this.jobId = jobId;
+        this.sourceProperties = originTvfProps;
+        this.chunkHighWatermarkMap = new HashMap<>();
+        String type = originTvfProps.get(DataSourceConfigKeys.TYPE);
+        Preconditions.checkArgument(type != null, "type is required");
+        this.sourceType = DataSourceType.valueOf(type.toUpperCase());
+        this.snapshotParallelism = Integer.parseInt(
+                
originTvfProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+                        DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
+        String table = originTvfProps.get(DataSourceConfigKeys.TABLE);
+        Preconditions.checkArgument(table != null, "table is required for 
cdc_stream TVF");
+    }
+
+    /**
+     * Called once on fresh job creation (not on FE restart).
+     * Fetches snapshot splits from BE and persists them to the meta table.
+     */
+    @Override
+    public void initOnCreate() throws JobException {
+        String table = sourceProperties.get(DataSourceConfigKeys.TABLE);
+        splitChunks(Collections.singletonList(table));
+    }
+
+    /**
+     * Rewrites the cdc_stream TVF SQL with current offset meta and taskId,
+     * so the BE knows where to start reading and can report
+     * the end offset back via taskOffsetCache.
+     */
+    @Override
+    public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand, Offset runningOffset, long taskId) {
+        JdbcOffset offset = (JdbcOffset) runningOffset;

Review Comment:
   **[Nit] Line length.** This line exceeds the typical 120-character line 
length limit. Consider breaking the parameter list across multiple lines for 
readability.



##########
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.groovy:
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test streaming INSERT job using cdc_stream TVF for PostgreSQL.
+ *
+ * Scenario:
+ *   1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced.
+ *   2. Binlog phase: INSERT (C1, D1), UPDATE (C1), DELETE (D1) are applied.
+ */
+suite("test_streaming_job_cdc_stream_postgres", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_job_cdc_stream_postgres_name"
+    def currentDb = (sql "select database()")[0][0]
+    def dorisTable = "test_streaming_job_cdc_stream_postgres_tbl"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+    def pgTable = "test_streaming_job_cdc_stream_postgres_src"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+            `name` varchar(200) NULL,
+            `age`  int NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`name`)
+        DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+    """
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // prepare source table with pre-existing snapshot data
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+                      "name" varchar(200) PRIMARY KEY,
+                      "age"  int2
+                  )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('A1', 1)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('B1', 2)"""
+        }
+
+        // create streaming job via cdc_stream TVF (offset=initial → snapshot 
then binlog)
+        sql """
+            CREATE JOB ${jobName}
+            ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+            SELECT name, age FROM cdc_stream(
+                "type"         = "postgres",
+                "jdbc_url"     = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                "driver_url"   = "${driver_url}",
+                "driver_class" = "org.postgresql.Driver",
+                "user"         = "${pgUser}",
+                "password"     = "${pgPassword}",
+                "database"     = "${pgDB}",
+                "schema"             = "${pgSchema}",
+                "table"              = "${pgTable}",
+                "offset"             = "initial"
+            )
+        """
+
+        // wait for at least one snapshot task to succeed
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
+                log.info("SucceedTaskCount: " + cnt)
+                cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        // verify snapshot data
+        qt_snapshot_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY 
name """
+
+        // insert incremental rows in PostgreSQL
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('C1', 3)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('D1', 4)"""
+        }
+
+        // wait for binlog tasks to pick up the new rows
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')"""
+                log.info("incremental rows: " + rows)
+                (rows.get(0).get(0) as int) == 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY 
name """
+
+        // sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        // sql """drop table if exists ${currentDb}.${dorisTable} force"""

Review Comment:
   **[Medium] Cleanup commented out.** Per Doris regression test standards, 
tables should be dropped **before** use (not after) to preserve the environment 
for debugging. This is correctly done at lines 40-41. However, these 
commented-out cleanup lines at the end suggest the test was intended to clean 
up but the code was left commented out. Either remove these commented-out lines 
entirely (since drop-before-use is the convention), or uncomment them if they 
serve a purpose.



##########
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.groovy:
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test streaming INSERT job using cdc_stream TVF for PostgreSQL.
+ *
+ * Scenario:
+ *   1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced.
+ *   2. Binlog phase: INSERT (C1, D1), UPDATE (C1), DELETE (D1) are applied.
+ */
+suite("test_streaming_job_cdc_stream_postgres", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_job_cdc_stream_postgres_name"
+    def currentDb = (sql "select database()")[0][0]
+    def dorisTable = "test_streaming_job_cdc_stream_postgres_tbl"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+    def pgTable = "test_streaming_job_cdc_stream_postgres_src"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+            `name` varchar(200) NULL,
+            `age`  int NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`name`)
+        DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+    """
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // prepare source table with pre-existing snapshot data
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+                      "name" varchar(200) PRIMARY KEY,
+                      "age"  int2
+                  )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('A1', 1)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('B1', 2)"""
+        }
+
+        // create streaming job via cdc_stream TVF (offset=initial → snapshot 
then binlog)
+        sql """
+            CREATE JOB ${jobName}
+            ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+            SELECT name, age FROM cdc_stream(
+                "type"         = "postgres",
+                "jdbc_url"     = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                "driver_url"   = "${driver_url}",
+                "driver_class" = "org.postgresql.Driver",
+                "user"         = "${pgUser}",
+                "password"     = "${pgPassword}",
+                "database"     = "${pgDB}",
+                "schema"             = "${pgSchema}",
+                "table"              = "${pgTable}",
+                "offset"             = "initial"
+            )
+        """
+
+        // wait for at least one snapshot task to succeed
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
+                log.info("SucceedTaskCount: " + cnt)
+                cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        // verify snapshot data
+        qt_snapshot_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY 
name """
+

Review Comment:
   **[Low] Test description mismatch.** The suite comment (line 28) mentions 
"UPDATE (C1), DELETE (D1) are applied" but the test only performs INSERTs (C1, 
D1) and the expected output `!final_data` includes both rows. If UPDATE/DELETE 
are part of the intended test scenario, they are missing. If not, the comment 
should be corrected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to