This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d55a551627 [improve](streaming-job) Bind the incremental phase to a 
fixed BE and reuse the cdc reader (#64423)
9d55a551627 is described below

commit 9d55a551627b859ba602cb0287ef170cb5c2e39c
Author: wudi <[email protected]>
AuthorDate: Tue Jun 23 19:57:17 2026 +0800

    [improve](streaming-job) Bind the incremental phase to a fixed BE and reuse 
the cdc reader (#64423)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    For from-to (MySQL/PG CDC) streaming jobs, once a job enters the
    incremental (binlog) phase, two issues hurt throughput:
    
    - On the **FE** side, every polling round (default `max_interval` = 10s)
    re-selects a BE via global round-robin, so the task drifts across BEs
    with no job→BE affinity.
    - On the **cdc_client** side, although per-job reader ownership and a
    per-job fixed replication slot already exist, the live reader is not
    actually reused: the stream reader is closed and rebuilt on every round.
    
    As a result every round rebuilds the reader. For PG this means
    reconnecting the replication slot and re-locating the WAL position (~15s
    each round), which together with large-transaction buffering is a major
    cause of idle / low-throughput stalls in the incremental phase.
---
 .../doris/job/cdc/request/WriteRecordRequest.java  |   4 +
 .../org/apache/doris/job/common/FailureReason.java |  11 +-
 .../insert/streaming/AbstractStreamingTask.java    |  15 +-
 .../insert/streaming/StreamingInsertJob.java       |  73 +++++++-
 .../insert/streaming/StreamingMultiTblTask.java    |  49 ++++-
 .../org/apache/doris/job/manager/JobManager.java   |   2 +-
 .../doris/job/offset/SourceOffsetProvider.java     |   3 +
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  |  34 +++-
 .../apache/doris/job/util/StreamingJobUtils.java   |  18 ++
 .../apache/doris/cdcclient/common/Constants.java   |  12 ++
 .../org/apache/doris/cdcclient/common/Env.java     | 204 ++++++++++++++++++++-
 .../cdcclient/controller/ClientController.java     |  28 ++-
 .../cdcclient/service/PipelineCoordinator.java     |  73 ++++++--
 .../source/reader/AbstractCdcSourceReader.java     |   9 +
 .../source/reader/JdbcIncrementalSourceReader.java |  61 +++++-
 .../source/reader/mysql/MySqlSourceReader.java     |  48 ++++-
 .../reader/postgres/PostgresSourceReader.java      | 144 +++++++++++++--
 ...gres_job_slot_dropped_during_incremental.groovy | 174 ++++++++++++++++++
 18 files changed, 891 insertions(+), 71 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
index 511e4fcea74..037ae137763 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
@@ -31,4 +31,8 @@ public class WriteRecordRequest extends JobBaseRecordRequest {
     private String token;
     private String taskId;
     private Map<String, String> streamLoadProps;
+    // previous task ended abnormally, rebuild reader instead of reusing
+    private boolean rebuildReader;
+    // off by default: an old FE omits it, so a new cdc_client falls back to 
per-round reader close
+    private boolean reuseReader;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
index 4280d43bb66..a7c409ae74d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
@@ -41,13 +41,22 @@ public class FailureReason implements Writable {
 
     public FailureReason(String msg) {
         this.msg = msg;
-        if (StringUtils.isNotEmpty(msg) && isTooManyFailureRowsErr(msg)) {
+        if (StringUtils.isEmpty(msg)) {
+            this.code = InternalErrorCode.INTERNAL_ERR;
+        } else if (isReplicationSlotInvalidatedErr(msg)) {
+            // A lost/recreated replication slot cannot be resumed without 
data loss; stop auto-resume.
+            this.code = InternalErrorCode.CANNOT_RESUME_ERR;
+        } else if (isTooManyFailureRowsErr(msg)) {
             this.code = InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR;
         } else {
             this.code = InternalErrorCode.INTERNAL_ERR;
         }
     }
 
+    private static boolean isReplicationSlotInvalidatedErr(String msg) {
+        return msg.contains("Replication slot invalidated");
+    }
+
     private static boolean isTooManyFailureRowsErr(String msg) {
         return msg.contains("Insert has filtered data in strict mode")
                 || msg.contains("too many filtered")
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
index 62adf21daf6..224fe7c5dbb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
@@ -43,6 +43,8 @@ public abstract class AbstractStreamingTask {
     private static final int MAX_RETRY = 3;
     private static final String LABEL_SPLITTER = "_";
     private int retryCount = 0;
+    // in-place retry would reuse this taskId, breaking ownership-based zombie 
isolation
+    protected volatile boolean noRetry;
     protected String labelName;
     protected Offset runningOffset;
     protected UserIdentity userIdentity;
@@ -83,6 +85,14 @@ public abstract class AbstractStreamingTask {
 
     public abstract void closeOrReleaseResources();
 
+    // Release the remote cdc reader (keep slot). No-op for tasks without a 
cdc reader (e.g. TVF).
+    public void releaseRemoteReader() {
+    }
+
+    public long getRunningBackendId() {
+        return -1;
+    }
+
     public void execute() throws JobException {
         while (retryCount <= MAX_RETRY) {
             try {
@@ -96,8 +106,9 @@ public abstract class AbstractStreamingTask {
                 }
                 this.errMsg = e.getMessage();
                 retryCount++;
-                if (retryCount > MAX_RETRY) {
-                    log.error("Task execution failed after {} retries.", 
MAX_RETRY, e);
+                if (noRetry || retryCount > MAX_RETRY) {
+                    log.error("Task execution failed, job id {}, task id {}, 
noRetry {}, retry {}.",
+                            jobId, taskId, noRetry, retryCount, e);
                     onFail(e.getMessage());
                     return;
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 855ee75183a..b22ccea0b19 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -77,6 +77,7 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSetMetaData;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.system.Backend;
 import org.apache.doris.tablefunction.S3TableValuedFunction;
 import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TRow;
@@ -185,6 +186,15 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @SerializedName("st")
     private List<String> syncTables;
 
+    // Incremental(binlog) phase: bound BE for reader reuse; <=0 = unbound 
(replay-safe default).
+    @SerializedName("bbe")
+    private volatile long boundBackendId;
+
+    // previous task ended abnormally (or FE restarted), next dispatch must 
rebuild the cdc reader
+    @Getter
+    @Setter
+    private transient volatile boolean needRebuildReader = true;
+
     // The sampling window starts at the beginning of the sampling window.
     // If the error rate exceeds `max_filter_ratio` within the window, the 
sampling fails.
     @Setter
@@ -398,6 +408,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             provider = new JdbcSourceOffsetProvider(getJobId(), 
dataSourceType, jdbcSourceProps);
         }
         provider.setCloudCluster(this.cloudCluster);
+        provider.setBoundBackendId(boundBackendId);
         return provider;
     }
 
@@ -633,8 +644,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         } else {
             this.runningStreamTask = createStreamingMultiTblTask();
         }
-        
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
+        // Set PENDING before registering, else the scheduler thread may set 
RUNNING first and we clobber it.
         this.runningStreamTask.setStatus(TaskStatus.PENDING);
+        
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
         log.info("create new streaming insert task for job {}, task {} ",
                 getJobId(), runningStreamTask.getTaskId());
         recordTasks(runningStreamTask);
@@ -651,6 +663,18 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 getCreateUser(), cloudCluster);
     }
 
+    // Binlog phase: prefer the bound BE in selectBackend; rebind + persist on 
change.
+    public Backend resolveBoundBackend() throws JobException {
+        Backend selected = StreamingJobUtils.selectBackend(cloudCluster, 
boundBackendId);
+        if (selected.getId() != boundBackendId) {
+            log.info("bind job {} to backend {} (was {})", getJobId(), 
selected.getId(), boundBackendId);
+            boundBackendId = selected.getId();
+            logUpdateOperation();
+        }
+        offsetProvider.setBoundBackendId(boundBackendId);
+        return selected;
+    }
+
     private Map<String, String> getConvertedSourceProperties() throws 
JobException {
         if (convertedSourceProperties == null) {
             this.convertedSourceProperties = 
StreamingJobUtils.convertCertFile(getDbId(), sourceProperties);
@@ -787,6 +811,28 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
     }
 
+    // Command entry for a manual status change: reset the failure/retry 
budget, and on manual pause
+    // release the reader (keep slot). "Manual" is decided by the caller, 
never by reading failureReason.
+    public void onManualStatusAltered(JobStatus newStatus, FailureReason 
reason) {
+        AbstractStreamingTask taskToRelease = null;
+        lock.writeLock().lock();
+        try {
+            resetFailureInfo(reason);
+            if (JobStatus.PAUSED.equals(newStatus) && runningStreamTask != 
null) {
+                // Force resume to swap in a fresh reader, in case the release 
RPC races or fails.
+                this.needRebuildReader = true;
+                taskToRelease = runningStreamTask;
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+        // Release outside the write lock: the RPC may block on first brpc 
connect and this is
+        // best-effort (needRebuildReader already forces a fresh reader; a 
stale release is a no-op).
+        if (taskToRelease != null) {
+            taskToRelease.releaseRemoteReader();
+        }
+    }
+
     public boolean hasMoreDataToConsume() {
         return offsetProvider.hasMoreDataToConsume();
     }
@@ -808,9 +854,14 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     public void onStreamTaskFail(AbstractStreamingTask task) throws 
JobException {
         try {
+            this.needRebuildReader = true;
             failedTaskCount.incrementAndGet();
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
-            this.failureReason = new FailureReason(task.getErrMsg());
+            // Don't overwrite a manual pause: a late failure callback would 
otherwise let auto resume wake it.
+            if (this.getFailureReason() == null
+                    || 
!InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) {
+                this.failureReason = new FailureReason(task.getErrMsg());
+            }
             if (MetricRepo.isInit) {
                 
MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
             }
@@ -822,6 +873,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     public void onStreamTaskSuccess(AbstractStreamingTask task) throws 
JobException {
         try {
+            this.needRebuildReader = false;
             resetFailureInfo(null);
             succeedTaskCount.incrementAndGet();
             lastTaskSuccessTime = System.currentTimeMillis();
@@ -997,6 +1049,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         setFailedTaskCount(replayJob.getFailedTaskCount());
         setCanceledTaskCount(replayJob.getCanceledTaskCount());
         setLastTaskSuccessTime(replayJob.getLastTaskSuccessTime());
+        this.boundBackendId = replayJob.boundBackendId;
     }
 
     /**
@@ -1419,6 +1472,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         if (null == lock) {
             this.lock = new ReentrantReadWriteLock(true);
         }
+
+        // a stale reader may survive on the bound BE across FE 
restart/failover
+        this.needRebuildReader = true;
     }
 
     /**
@@ -1493,6 +1549,14 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                             getJobId(), this.runningStreamTask.getTaskId(), 
offsetRequest.getTaskId());
                     return;
                 }
+                // Reject a late commit from an already-failed task (e.g. brpc 
timeout) reviving a paused job.
+                if 
(!TaskStatus.RUNNING.equals(this.runningStreamTask.getStatus())) {
+                    log.info("Streaming multi table job {} skip late commit 
offset on non-running task {} "
+                                    + "(status: {}, actual: {})",
+                            getJobId(), this.runningStreamTask.getTaskId(),
+                            this.runningStreamTask.getStatus(), 
offsetRequest.getTaskId());
+                    return;
+                }
                 if (this.runningStreamTask.getTaskId() != 
offsetRequest.getTaskId()) {
                     throw new JobException("Task id mismatch when commit 
offset. expected: "
                             + this.runningStreamTask.getTaskId() + ", actual: 
" + offsetRequest.getTaskId());
@@ -1576,6 +1640,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         if (offsetProvider != null) {
             // when fe restart, offsetProvider.jobId/sourceProperties may be 
null
             offsetProvider.ensureInitialized(getJobId(), getProviderProps());
+            // replayOnUpdated skips the transient provider; resync routing BE.
+            offsetProvider.setBoundBackendId(boundBackendId);
             offsetProvider.replayIfNeed(this);
         }
     }
@@ -1618,7 +1684,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             } catch (JobException ex) {
                 log.warn("refresh provider props before cleanMeta failed, job 
id: {}", getJobId(), ex);
             }
-            ((JdbcSourceOffsetProvider) 
this.offsetProvider).cleanMeta(getJobId());
+            long runtimeBackendId = runningStreamTask != null ? 
runningStreamTask.getRunningBackendId() : -1;
+            ((JdbcSourceOffsetProvider) 
this.offsetProvider).cleanMeta(getJobId(), runtimeBackendId);
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 85525beb1f8..bf41d674afc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -131,7 +131,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     }
 
     private void sendWriteRequest() throws JobException {
-        Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
+        Backend backend = resolveBackend();
         log.info("start to run streaming multi task {} in backend {}/{}, 
offset is {}",
                 taskId, backend.getId(), backend.getHost(), 
runningOffset.toString());
         this.runningBackendId = backend.getId();
@@ -172,13 +172,36 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
             log.warn("cdc_client RPC timeout api=/api/writeRecords taskId={} 
jobId={} backend={}:{} timeout_sec={}",
                     taskId, getJobId(), backend.getHost(), 
backend.getBrpcPort(),
                     Config.streaming_cdc_heavy_rpc_timeout_sec);
+            // the request may have been dispatched and still running remotely
+            noRetry = true;
             throw new JobException("cdc_client RPC timeout: /api/writeRecords 
taskId=" + taskId);
         } catch (ExecutionException | InterruptedException ex) {
+            if (ex instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
             log.error("Send write request failed: ", ex);
+            noRetry = true;
             throw new JobException(ex);
         }
     }
 
+    private Backend resolveBackend() throws JobException {
+        // Snapshot phase keeps per-round selection; binlog phase binds to a 
fixed BE for reuse.
+        if (((JdbcOffset) runningOffset).snapshotSplit()) {
+            return StreamingJobUtils.selectBackend(cloudCluster);
+        }
+        return getStreamingJob().resolveBoundBackend();
+    }
+
+    // Fail loud on a dropped/wrong-type job rather than return null and risk 
a downstream NPE.
+    private StreamingInsertJob getStreamingJob() throws JobException {
+        Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
+        if (job == null) {
+            throw new JobException("Streaming job " + getJobId() + " not 
found");
+        }
+        return (StreamingInsertJob) job;
+    }
+
     private String getToken() throws JobException {
         String token = "";
         try {
@@ -214,6 +237,9 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         request.setFrontendAddress(feAddr);
         request.setMaxInterval(jobProperties.getMaxIntervalSecond());
         request.setTaskTimeoutMs(getTaskTimeoutMs());
+        request.setRebuildReader(getStreamingJob().isNeedRebuildReader());
+        // Reader reuse applies only to the binlog phase (snapshot 
rebinds/closes per split).
+        request.setReuseReader(!offset.snapshotSplit());
         if (offsetProvider instanceof JdbcSourceOffsetProvider) {
             String schemas = ((JdbcSourceOffsetProvider) 
offsetProvider).getTableSchemas();
             if (schemas != null) {
@@ -312,28 +338,31 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
 
     @Override
     protected void onFail(String errMsg) throws JobException {
-        // Release this task's reader before reschedule so it stops competing 
for the shared slot.
+        // Stop a possibly still-running reader now, so the PG slot frees 
before auto-resume re-acquires it.
         releaseRemoteReader();
         super.onFail(errMsg);
     }
 
     @Override
     public void cancel(boolean needWaitCancelComplete) {
-        // No release here: DROP/STOP/PAUSE clean up via /api/close; releasing 
would orphan the engine.
+        // No release here: drop/stop free via /api/close and manual pause via 
/api/releaseReader;
+        // releasing in cancel would orphan the reused engine.
         super.cancel(needWaitCancelComplete);
     }
 
     @Override
     public void closeOrReleaseResources() {
-        // No-op: reader is shared across tasks; release on reschedule is done 
in onFail().
+        // No-op: the reader is async and reused; releasing here 
(per-iteration finally) would kill it.
     }
 
-    /**
-     * Best-effort, onFail reschedule only: stop this job's reader on {@link 
#runningBackendId} so a
-     * reschedule to another backend never leaves two readers competing for 
the same source (e.g. one
-     * PG replication slot, which is kept, not dropped). Failures are 
swallowed and must not block
-     * rescheduling.
-     */
+    @Override
+    public long getRunningBackendId() {
+        return runningBackendId;
+    }
+
+    // Best-effort release on runningBackendId (keep slot): on task failure to 
stop a stuck/zombie
+    // reader early, and on manual pause so resume can rebind. Failures 
swallowed; idle reaper backs up.
+    @Override
     public void releaseRemoteReader() {
         if (runningBackendId <= 0) {
             return;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 23f51890da5..8c55856da57 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -282,7 +282,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
                     checkSameStatus(a, jobStatus);
                     alterJobStatus(a.getJobId(), jobStatus);
                     if (a instanceof StreamingInsertJob) {
-                        ((StreamingInsertJob) a).resetFailureInfo(reason);
+                        ((StreamingInsertJob) 
a).onManualStatusAltered(jobStatus, reason);
                     }
                 } catch (JobException e) {
                     throw new JobException("Alter job status error, jobName is 
%s, errorMsg is %s",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index fe03208bd18..58c83eac632 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -94,6 +94,9 @@ public interface SourceOffsetProvider {
      */
     default void setCloudCluster(String cloudCluster) {}
 
+    /** Bind the BE this job is pinned to in the binlog phase, for 
reader-reuse heartbeat routing. */
+    default void setBoundBackendId(long boundBackendId) {}
+
     /**
      * Fetch remote meta information, such as listing files in S3 or getting 
latest offsets in Kafka.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index f80147e970f..bb8e59326ef 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -107,6 +107,9 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
 
     transient volatile String cloudCluster;
 
+    // Route fetchEndOffset/compareOffset to the bound BE (synced from job, 
not persisted).
+    transient volatile long boundBackendId;
+
     /** Split progress (cdc-fetch view), >= committedSplitProgress. Rebuilt on 
restart. */
     transient SplitProgress cdcSplitProgress = new SplitProgress();
 
@@ -254,9 +257,14 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
         this.currentOffset = newOffset;
     }
 
+    @Override
+    public void setBoundBackendId(long boundBackendId) {
+        this.boundBackendId = boundBackendId;
+    }
+
     @Override
     public void fetchRemoteMeta(Map<String, String> properties) throws 
Exception {
-        Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
+        Backend backend = StreamingJobUtils.selectBackend(cloudCluster, 
boundBackendId);
         JobBaseConfig requestParams =
                 new JobBaseConfig(getJobId().toString(), sourceType.name(), 
sourceProperties, getFrontendAddress());
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
@@ -354,7 +362,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
 
     private boolean compareOffset(Map<String, String> offsetFirst, Map<String, 
String> offsetSecond)
             throws JobException {
-        Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
+        Backend backend = StreamingJobUtils.selectBackend(cloudCluster, 
boundBackendId);
         CompareOffsetRequest requestParams =
                 new CompareOffsetRequest(getJobId(), sourceType.name(), 
sourceProperties,
                         getFrontendAddress(), offsetFirst, offsetSecond);
@@ -999,10 +1007,20 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
         }
     }
 
-    public void cleanMeta(Long jobId) throws JobException {
+    public void cleanMeta(Long jobId, long runtimeBackendId) throws 
JobException {
         // clean meta table
         StreamingJobUtils.deleteJobMeta(jobId);
-        Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
+        // Dropping the slot only succeeds on the BE owning the live reader 
(it stops its own engine
+        // first, freeing the slot). Prefer the runtime BE (covers the unbound 
snapshot phase), then
+        // the bound BE; both may be alive but not load-available, so route by 
isAlive. Only when
+        // neither is alive fall back to a random BE to drop the now-inactive 
slot.
+        Backend backend = aliveBackend(runtimeBackendId);
+        if (backend == null) {
+            backend = aliveBackend(boundBackendId);
+        }
+        if (backend == null) {
+            backend = StreamingJobUtils.selectBackend(cloudCluster, 
boundBackendId);
+        }
         JobBaseConfig requestParams =
                 new JobBaseConfig(getJobId().toString(), sourceType.name(), 
sourceProperties, getFrontendAddress());
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
@@ -1027,6 +1045,14 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
         }
     }
 
+    private static Backend aliveBackend(long backendId) {
+        if (backendId <= 0) {
+            return null;
+        }
+        Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+        return be != null && be.isAlive() ? be : null;
+    }
+
     private String getFrontendAddress() {
         return Env.getCurrentEnv().getMasterHost() + ":" + 
Env.getCurrentEnv().getMasterHttpPort();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index cff663e0142..ff67d2429e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -274,6 +274,11 @@ public class StreamingJobUtils {
     }
 
     public static Backend selectBackend(String cloudCluster) throws 
JobException {
+        return selectBackend(cloudCluster, -1);
+    }
+
+    // Prefer preferredBackendId if it is in the cluster's available BEs (also 
enforces cloud group).
+    public static Backend selectBackend(String cloudCluster, long 
preferredBackendId) throws JobException {
         if (Config.isCloudMode() && StringUtils.isNotEmpty(cloudCluster)) {
             List<Backend> bes = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
                     .getBackendsByClusterName(cloudCluster)
@@ -284,10 +289,23 @@ public class StreamingJobUtils {
                 throw new 
JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG
                         + ", compute_group: " + cloudCluster);
             }
+            if (preferredBackendId > 0) {
+                for (Backend be : bes) {
+                    if (be.getId() == preferredBackendId) {
+                        return be;
+                    }
+                }
+            }
             int idx = getLastSelectedBackendIndexAndUpdate();
             return bes.get(Math.floorMod(idx, bes.size()));
         }
 
+        if (preferredBackendId > 0) {
+            Backend bound = 
Env.getCurrentSystemInfo().getBackend(preferredBackendId);
+            if (bound != null && bound.isLoadAvailable()) {
+                return bound;
+            }
+        }
         BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
                 .setEnableRoundRobin(true).needLoadAvailable().build();
         policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
index 953903a8032..a9eea173d4d 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
@@ -25,4 +25,16 @@ public class Constants {
     public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 3000L;
 
     public static final String DORIS_TARGET_DB = "doris_target_db";
+
+    // Background cleanup tick: idle-reader release + retrying deferred slot 
drops.
+    public static final long BACKGROUND_CLEANUP_INTERVAL_MS = 15_000L;
+    // Idle from-to reader cleanup: release (keep slot) when idle past 
MULTIPLIER * max_interval.
+    public static final int IDLE_READER_TIMEOUT_MULTIPLIER = 10;
+    // Floor the idle timeout: PG reader rebuild is costly, absorb heartbeat 
jitter at small
+    // interval.
+    public static final long IDLE_READER_MIN_TIMEOUT_MS = 90_000L;
+
+    // Retry dropping a slot still held by a dead BE until it frees 
(wal_sender_timeout) or this
+    // elapses.
+    public static final long SLOT_DROP_RETRY_WINDOW_MS = 300_000L;
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
index 28da598053b..b39efd01324 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
@@ -19,13 +19,18 @@ package org.apache.doris.cdcclient.common;
 
 import org.apache.doris.cdcclient.source.factory.DataSource;
 import org.apache.doris.cdcclient.source.factory.SourceReaderFactory;
+import org.apache.doris.cdcclient.source.reader.AbstractCdcSourceReader;
 import org.apache.doris.cdcclient.source.reader.SourceReader;
 import org.apache.doris.job.cdc.request.JobBaseConfig;
+import org.apache.doris.job.cdc.request.WriteRecordRequest;
 
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -40,6 +45,8 @@ public class Env {
     private static volatile Env INSTANCE;
     private final Map<String, JobContext> jobContexts;
     private final Map<String, Lock> jobLocks;
+    private final Map<String, SlotDropTask> pendingSlotDrops;
+    private final ScheduledExecutorService backgroundCleaner;
     @Setter private int backendHttpPort;
     @Setter @Getter private String clusterToken;
     @Setter @Getter private volatile String feMasterAddress;
@@ -47,6 +54,19 @@ public class Env {
     private Env() {
         this.jobContexts = new ConcurrentHashMap<>();
         this.jobLocks = new ConcurrentHashMap<>();
+        this.pendingSlotDrops = new ConcurrentHashMap<>();
+        this.backgroundCleaner =
+                Executors.newSingleThreadScheduledExecutor(
+                        r -> {
+                            Thread t = new Thread(r, "cdc-background-cleaner");
+                            t.setDaemon(true);
+                            return t;
+                        });
+        this.backgroundCleaner.scheduleWithFixedDelay(
+                this::runBackgroundCleanup,
+                Constants.BACKGROUND_CLEANUP_INTERVAL_MS,
+                Constants.BACKGROUND_CLEANUP_INTERVAL_MS,
+                TimeUnit.MILLISECONDS);
     }
 
     public String getBackendHostPort() {
@@ -79,6 +99,22 @@ public class Env {
         return context == null ? null : context.reader;
     }
 
+    /**
+     * Reader for stateless metadata ops (end offset / compare): reuse the 
live one if present, else
+     * a throwaway instance. Never create/cache/initialize a heavy reader, so 
a metadata RPC for an
+     * idle/absent job can't trigger pub/slot/schema (re)initialization or 
leak an unreaped context.
+     */
+    public SourceReader getMetaReader(JobBaseConfig jobConfig) {
+        if (jobConfig.getFrontendAddress() != null && 
!jobConfig.getFrontendAddress().isEmpty()) {
+            this.feMasterAddress = jobConfig.getFrontendAddress();
+        }
+        SourceReader existing = getReaderIfPresent(jobConfig.getJobId());
+        if (existing != null) {
+            return existing;
+        }
+        return 
SourceReaderFactory.createSourceReader(resolveDataSource(jobConfig.getDataSource()));
+    }
+
     /**
      * Get-or-create this job's reader and claim ownership for {@code taskId} 
atomically under the
      * per-job lock, so a concurrent stale release cannot stop a reader this 
task is about to use.
@@ -90,9 +126,24 @@ public class Env {
         DataSource ds = resolveDataSource(jobConfig.getDataSource());
         String jobId = jobConfig.getJobId();
         Lock lock = jobLocks.computeIfAbsent(jobId, k -> new ReentrantLock());
+        SourceReader staleReader = null;
+        JobBaseConfig staleConfig = null;
+        SourceReader reader;
         lock.lock();
         try {
             JobContext context = jobContexts.get(jobId);
+            if (context != null
+                    && jobConfig instanceof WriteRecordRequest
+                    && ((WriteRecordRequest) jobConfig).isRebuildReader()) {
+                // FE declared the previous task abnormal: swap in a fresh 
reader instance so the
+                // old task's thread can never reach the new fetcher.
+                LOG.info(
+                        "Rebuild reader for job {} on FE request, discard 
current instance", jobId);
+                jobContexts.remove(jobId);
+                staleReader = context.reader;
+                staleConfig = context.jobConfig != null ? context.jobConfig : 
jobConfig;
+                context = null;
+            }
             if (context == null) {
                 LOG.info("Creating new reader for job {}, dataSource {}", 
jobId, ds);
                 context = new JobContext(jobId, ds, jobConfig.getConfig());
@@ -100,10 +151,30 @@ public class Env {
                 jobContexts.put(jobId, context);
             }
             context.ownerTaskId = taskId;
-            return context.getReader(ds);
+            context.jobConfig = jobConfig;
+            if (jobConfig instanceof WriteRecordRequest) {
+                context.maxIntervalMs = ((WriteRecordRequest) 
jobConfig).getMaxInterval() * 1000;
+            }
+            context.lastAliveTime = System.currentTimeMillis();
+            reader = context.getReader(ds);
         } finally {
             lock.unlock();
         }
+        if (staleReader != null) {
+            // free the engine/slot connection before the caller submits the 
new fetcher
+            try {
+                staleReader.release(staleConfig);
+            } catch (Exception ex) {
+                LOG.warn("Failed to release stale reader for job {}", jobId, 
ex);
+            }
+        }
+        return reader;
+    }
+
+    /** Whether {@code taskId} is still the current claimer of this job's 
reader. */
+    public boolean isOwner(String jobId, String taskId) {
+        JobContext context = jobContexts.get(jobId);
+        return context != null && Objects.equals(context.ownerTaskId, taskId);
     }
 
     /**
@@ -192,12 +263,143 @@ public class Env {
         }
     }
 
+    /** Liveness evidence (FE heartbeat or active poll): keep this job's 
reader alive. */
+    public void keepAlive(String jobId) {
+        JobContext context = jobContexts.get(jobId);
+        if (context != null) {
+            context.lastAliveTime = System.currentTimeMillis();
+        }
+    }
+
+    // Release (keep slot) readers FE no longer drives; maxIntervalMs<=0 = 
untracked (e.g. TVF),
+    // skip.
+    private void releaseIdleReaders() {
+        long now = System.currentTimeMillis();
+        for (String jobId : jobContexts.keySet()) {
+            Lock lock = jobLocks.get(jobId);
+            if (lock == null || !lock.tryLock()) {
+                continue;
+            }
+            SourceReader toRelease = null;
+            JobBaseConfig releaseConfig = null;
+            try {
+                JobContext context = jobContexts.get(jobId);
+                if (context == null || context.lastAliveTime <= 0 || 
context.maxIntervalMs <= 0) {
+                    continue;
+                }
+                long timeout =
+                        Math.max(
+                                (long) Constants.IDLE_READER_TIMEOUT_MULTIPLIER
+                                        * context.maxIntervalMs,
+                                Constants.IDLE_READER_MIN_TIMEOUT_MS);
+                if (now - context.lastAliveTime <= timeout) {
+                    continue;
+                }
+                LOG.info(
+                        "Releasing idle reader for job {}, idle {} ms, keep 
slot",
+                        jobId,
+                        now - context.lastAliveTime);
+                jobContexts.remove(jobId);
+                toRelease = context.reader;
+                releaseConfig = context.jobConfig;
+            } finally {
+                lock.unlock();
+            }
+            // Release outside the lock so blocking IO never stalls 
getReaderAndClaim/detach.
+            if (toRelease != null && releaseConfig != null) {
+                try {
+                    toRelease.release(releaseConfig);
+                } catch (Exception ex) {
+                    LOG.warn("Failed to release idle reader for job {}", 
jobId, ex);
+                }
+            }
+        }
+    }
+
+    // Each chore is guarded independently: one failing must not skip the 
other, and an uncaught
+    // throwable here would silently cancel the whole periodic task.
+    private void runBackgroundCleanup() {
+        try {
+            releaseIdleReaders();
+        } catch (Exception e) {
+            LOG.warn("releaseIdleReaders failed", e);
+        }
+        try {
+            retryPendingSlotDrops();
+        } catch (Exception e) {
+            LOG.warn("retryPendingSlotDrops failed", e);
+        }
+    }
+
+    /**
+     * Run source-side cleanup; if incomplete (e.g. slot still held by a dead 
BE), retry in
+     * background.
+     */
+    public void releaseSourceResourcesOrRetry(SourceReader reader, 
JobBaseConfig jobConfig) {
+        if (!releaseSourceResources(reader, jobConfig)) {
+            scheduleSlotDrop(jobConfig);
+        }
+    }
+
+    public void scheduleSlotDrop(JobBaseConfig jobConfig) {
+        long deadline = System.currentTimeMillis() + 
Constants.SLOT_DROP_RETRY_WINDOW_MS;
+        pendingSlotDrops.putIfAbsent(jobConfig.getJobId(), new 
SlotDropTask(jobConfig, deadline));
+        LOG.info("Scheduled background slot drop for job {}", 
jobConfig.getJobId());
+    }
+
+    private boolean releaseSourceResources(SourceReader reader, JobBaseConfig 
jobConfig) {
+        return ((AbstractCdcSourceReader) 
reader).releaseSourceResources(jobConfig);
+    }
+
+    private void retryPendingSlotDrops() {
+        long now = System.currentTimeMillis();
+        for (Map.Entry<String, SlotDropTask> entry : 
pendingSlotDrops.entrySet()) {
+            String jobId = entry.getKey();
+            SlotDropTask task = entry.getValue();
+            boolean done = false;
+            try {
+                SourceReader reader =
+                        SourceReaderFactory.createSourceReader(
+                                
resolveDataSource(task.jobConfig.getDataSource()));
+                done = releaseSourceResources(reader, task.jobConfig);
+            } catch (Exception ex) {
+                LOG.warn(
+                        "Background slot drop attempt failed for job {}: {}",
+                        jobId,
+                        ex.getMessage());
+            }
+            if (done) {
+                pendingSlotDrops.remove(jobId);
+                LOG.info("Background slot drop succeeded for job {}", jobId);
+            } else if (now >= task.deadlineMs) {
+                pendingSlotDrops.remove(jobId);
+                LOG.warn(
+                        "Gave up dropping replication slot for job {} after 
retry window; "
+                                + "manual cleanup may be needed",
+                        jobId);
+            }
+        }
+    }
+
+    private static final class SlotDropTask {
+        private final JobBaseConfig jobConfig;
+        private final long deadlineMs;
+
+        private SlotDropTask(JobBaseConfig jobConfig, long deadlineMs) {
+            this.jobConfig = jobConfig;
+            this.deadlineMs = deadlineMs;
+        }
+    }
+
     private static final class JobContext {
         private final String jobId;
         private volatile SourceReader reader;
         private volatile String ownerTaskId;
         private volatile Map<String, String> config;
         private volatile DataSource dataSource;
+        private volatile JobBaseConfig jobConfig;
+        private volatile long maxIntervalMs;
+        private volatile long lastAliveTime;
 
         private JobContext(String jobId, DataSource dataSource, Map<String, 
String> config) {
             this.jobId = jobId;
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 1ac874ceeaa..b1dbe773844 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -106,7 +106,8 @@ public class ClientController {
     public Object fetchEndOffset(@RequestBody JobBaseConfig jobConfig) {
         LOG.info("Fetching end offset for job {}", jobConfig.getJobId());
         try {
-            SourceReader reader = Env.getCurrentEnv().getReader(jobConfig);
+            SourceReader reader = Env.getCurrentEnv().getMetaReader(jobConfig);
+            Env.getCurrentEnv().keepAlive(jobConfig.getJobId());
             return RestResponse.success(reader.getEndOffset(jobConfig));
         } catch (Exception ex) {
             LOG.error("Failed to fetch end offset, jobId={}", 
jobConfig.getJobId(), ex);
@@ -118,7 +119,7 @@ public class ClientController {
     @RequestMapping(path = "/api/compareOffset", method = RequestMethod.POST)
     public Object compareOffset(@RequestBody CompareOffsetRequest 
compareOffsetRequest) {
         try {
-            SourceReader reader = 
Env.getCurrentEnv().getReader(compareOffsetRequest);
+            SourceReader reader = 
Env.getCurrentEnv().getMetaReader(compareOffsetRequest);
             return 
RestResponse.success(reader.compareOffset(compareOffsetRequest));
         } catch (Exception ex) {
             LOG.error("Failed to compare offset, jobId={}", 
compareOffsetRequest.getJobId(), ex);
@@ -129,12 +130,25 @@ public class ClientController {
     /** Close job */
     @RequestMapping(path = "/api/close", method = RequestMethod.POST)
     public Object close(@RequestBody JobBaseConfig jobConfig) {
-        LOG.info("Closing job {}", jobConfig.getJobId());
+        String jobId = jobConfig.getJobId();
+        LOG.info("Closing job {}", jobId);
         Env env = Env.getCurrentEnv();
-        SourceReader reader = env.getReader(jobConfig);
-        reader.close(jobConfig);
-        env.close(jobConfig.getJobId());
-        pipelineCoordinator.closeJobStreamLoad(jobConfig.getJobId());
+        // Don't rebuild a reader to close it; an absent reader (owner BE 
gone) just needs its slot
+        // dropped.
+        SourceReader reader = env.getReaderIfPresent(jobId);
+        try {
+            if (reader != null) {
+                reader.release(jobConfig);
+            }
+            SourceReader dropper = reader != null ? reader : 
env.getMetaReader(jobConfig);
+            env.releaseSourceResourcesOrRetry(dropper, jobConfig);
+        } catch (Exception ex) {
+            LOG.warn("Close job {} teardown failed: {}", jobId, 
ex.getMessage());
+            env.scheduleSlotDrop(jobConfig);
+        } finally {
+            env.close(jobId);
+            pipelineCoordinator.closeJobStreamLoad(jobId);
+        }
         return RestResponse.success(true);
     }
 
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 51871db92b1..735b2b6b815 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -383,7 +383,8 @@ public class PipelineCoordinator {
                     System.currentTimeMillis() - startTime,
                     fetchRecord.getJobId());
         } finally {
-            cleanupReaderResources(sourceReader, fetchRecord.getJobId(), 
readResult);
+            // Debug fetch path is out of reuse scope: finish the reader each 
round.
+            cleanupReaderResources(sourceReader, fetchRecord.getJobId(), 
readResult, false);
         }
 
         // Extract and set offset metadata
@@ -410,7 +411,13 @@ public class PipelineCoordinator {
                                 writeRecordRequest.getJobId(),
                                 writeRecordRequest.getTaskId());
                     } catch (Exception ex) {
-                        closeJobStreamLoad(writeRecordRequest.getJobId());
+                        // a displaced task must not close the streamload the 
successor is using
+                        if (Env.getCurrentEnv()
+                                .isOwner(
+                                        writeRecordRequest.getJobId(),
+                                        writeRecordRequest.getTaskId())) {
+                            closeJobStreamLoad(writeRecordRequest.getJobId());
+                        }
                         String rootCauseMessage = 
ExceptionUtils.getRootCauseMessage(ex);
                         taskErrorMaps.put(writeRecordRequest.getTaskId(), 
rootCauseMessage);
                         taskProgressMap.remove(writeRecordRequest.getTaskId());
@@ -468,6 +475,7 @@ public class PipelineCoordinator {
         SplitReadResult readResult = null;
         boolean hasExecuteDDL = false;
         boolean isSnapshotSplit = false;
+        boolean stillOwner = false;
         try {
             // 1. submit split async
             readResult = 
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
@@ -491,6 +499,8 @@ public class PipelineCoordinator {
 
             // 2. poll record
             while (!shouldStop) {
+                // Active poll keeps the reader alive so the reaper won't 
reclaim it mid-task.
+                Env.getCurrentEnv().keepAlive(writeRecordRequest.getJobId());
                 Iterator<SourceRecord> recordIterator = 
sourceReader.pollRecords();
 
                 if (!recordIterator.hasNext()) {
@@ -544,6 +554,21 @@ public class PipelineCoordinator {
                 }
 
                 while (recordIterator.hasNext()) {
+                    // streamload backpressure can stall this loop past the 
reaper timeout
+                    
Env.getCurrentEnv().keepAlive(writeRecordRequest.getJobId());
+                    // A successor task took over: stop draining into the 
shared batchStreamLoad.
+                    if (!Env.getCurrentEnv()
+                            .isOwner(
+                                    writeRecordRequest.getJobId(),
+                                    writeRecordRequest.getTaskId())) {
+                        LOG.info(
+                                "Task {} displaced mid-write for job {} after 
{} rows, stop writing",
+                                writeRecordRequest.getTaskId(),
+                                writeRecordRequest.getJobId(),
+                                scannedRows);
+                        shouldStop = true;
+                        break;
+                    }
                     SourceRecord element = recordIterator.next();
 
                     // Check if this is a heartbeat message
@@ -609,7 +634,25 @@ public class PipelineCoordinator {
                     writeRecordRequest.getTaskId());
 
         } finally {
-            cleanupReaderResources(sourceReader, 
writeRecordRequest.getJobId(), readResult);
+            stillOwner =
+                    Env.getCurrentEnv()
+                            .isOwner(writeRecordRequest.getJobId(), 
writeRecordRequest.getTaskId());
+            // A displaced task must not touch the reader (finishSplitRecords 
would kill the
+            // successor's fetcher) nor commit anything.
+            if (stillOwner) {
+                cleanupReaderResources(
+                        sourceReader,
+                        writeRecordRequest.getJobId(),
+                        readResult,
+                        writeRecordRequest.isReuseReader());
+            }
+        }
+        if (!stillOwner) {
+            LOG.info(
+                    "Skip commit for job {} task {}: reader released or taken 
over",
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId());
+            return;
         }
 
         // 3. Extract offset from split state
@@ -618,7 +661,6 @@ public class PipelineCoordinator {
         batchStreamLoad.forceFlush();
 
         // 5. request fe api update offset
-        String currentTaskId = batchStreamLoad.getCurrentTaskId();
         // The offset must be reset before commitOffset to prevent the next 
taskId from being create
         // by the fe.
         batchStreamLoad.resetTaskId();
@@ -632,13 +674,14 @@ public class PipelineCoordinator {
         if (hasExecuteDDL || (!isSnapshotSplit && feHadNoSchema)) {
             tableSchemas = sourceReader.serializeTableSchemas();
         }
+        // own taskId, never the shared currentTaskId: FE rejects it if 
another task took over
         batchStreamLoad.commitOffset(
-                currentTaskId,
+                writeRecordRequest.getTaskId(),
                 metaResponse,
                 scannedRows,
                 batchStreamLoad.getLoadStatistic(),
                 tableSchemas);
-        taskProgressMap.remove(currentTaskId);
+        taskProgressMap.remove(writeRecordRequest.getTaskId());
     }
 
     public static boolean isHeartbeatEvent(SourceRecord record) {
@@ -772,7 +815,14 @@ public class PipelineCoordinator {
      * @param readResult the read result containing split information
      */
     private void cleanupReaderResources(
-            SourceReader sourceReader, String jobId, SplitReadResult 
readResult) {
+            SourceReader sourceReader,
+            String jobId,
+            SplitReadResult readResult,
+            boolean reuseReader) {
+        boolean isSnapshotSplit =
+                readResult != null
+                        && readResult.getSplit() != null
+                        && sourceReader.isSnapshotSplit(readResult.getSplit());
         try {
             // The LSN in the commit is the current offset, which is the 
offset from the last
             // successful write.
@@ -781,11 +831,10 @@ public class PipelineCoordinator {
                 sourceReader.commitSourceOffset(jobId, readResult.getSplit());
             }
         } finally {
-            // This must be called after `commitSourceOffset`; otherwise,
-            // PG's confirmed lsn will not proceed.
-            // This operation must be performed before 
`batchStreamLoad.commitOffset`;
-            // otherwise, fe might create the next task for this job.
-            sourceReader.finishSplitRecords();
+            // Keep the binlog reader alive only when FE asked to reuse it; 
else close each round.
+            if (isSnapshotSplit || !reuseReader) {
+                sourceReader.finishSplitRecords();
+            }
         }
     }
 
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
index 74eb534bc5f..bc941663050 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -74,6 +74,15 @@ public abstract class AbstractCdcSourceReader implements 
SourceReader {
         // backend to take over.
         LOG.info("Release source reader for job {}", jobConfig.getJobId());
         finishSplitRecords();
+        shutdownSnapshotPollExecutor();
+    }
+
+    /** Stop the snapshot-phase poll thread pool; called when this reader 
instance is discarded. */
+    protected void shutdownSnapshotPollExecutor() {}
+
+    /** Drop source-side owned resources. Returns false if cleanup is 
incomplete (retry later). */
+    public boolean releaseSourceResources(JobBaseConfig jobConfig) {
+        return true;
     }
 
     protected abstract Class<?> probeSplitKeyClass(
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index f0987eb97ee..ea70c432e8e 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -99,7 +99,7 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
     private Set<String> completedSplitIds = ConcurrentHashMap.newKeySet();
 
     // Parallel polling support
-    private ExecutorService pollExecutor;
+    private ExecutorService snapshotPollExecutor;
     private volatile List<CompletableFuture<PollResult>> activePollFutures;
 
     // Stream/binlog reader (single reader for stream split)
@@ -123,7 +123,7 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
                         config.getOrDefault(
                                 DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
                                 
DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
-        this.pollExecutor =
+        this.snapshotPollExecutor =
                 Executors.newFixedThreadPool(
                         parallelism,
                         r -> {
@@ -358,7 +358,7 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
                                         split.splitId(),
                                         split.getTableId().identifier());
                             },
-                            pollExecutor));
+                            snapshotPollExecutor));
         }
 
         CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).join();
@@ -407,11 +407,40 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
                     baseReq.getJobId());
         }
         Tuple2<SourceSplitBase, Boolean> splitFlag = 
createStreamSplit(offsetMeta, baseReq);
-        this.streamSplit = splitFlag.f0.asStreamSplit();
+        StreamSplit newStreamSplit = splitFlag.f0.asStreamSplit();
+
+        // offset guard: reuse only when request start == reader's consumed 
position. Compare by
+        // compareTo (LSN), NOT equals -- PG drops lsn_proc/commit so same 
position differs by map.
+        if (this.streamReader != null && this.streamSplitState != null) {
+            Offset requestStart = newStreamSplit.getStartingOffset();
+            Offset readerPos = this.streamSplitState.getStartingOffset();
+            if (requestStart != null
+                    && readerPos != null
+                    && requestStart.compareTo(readerPos) == 0) {
+                LOG.info(
+                        "Reuse live stream reader for job {} at offset {}",
+                        baseReq.getJobId(),
+                        requestStart);
+                // Refresh split so commitSourceOffset advances PG 
confirmed_lsn to the FE-committed
+                // offset (== reader pos); poll/offset keep using 
streamSplitState.
+                this.streamSplit = newStreamSplit;
+                SplitReadResult reuseResult = new SplitReadResult();
+                
reuseResult.setSplits(Collections.singletonList(this.streamSplit));
+                Map<String, Object> reuseStates = new HashMap<>();
+                reuseStates.put(this.streamSplit.splitId(), 
this.streamSplitState);
+                reuseResult.setSplitStates(reuseStates);
+                return reuseResult;
+            }
+            LOG.info(
+                    "Rebuild stream reader for job {}: requestStart={}, 
readerPos={}",
+                    baseReq.getJobId(),
+                    requestStart,
+                    readerPos);
+        }
 
-        // Close previous stream reader to release resources (e.g. PG 
replication slot)
-        // before creating a new one. This prevents connection leaks when a 
cancelled
-        // task's reader is still active while a new task arrives.
+        this.streamSplit = newStreamSplit;
+        // Close previous stream reader (rebuild path) before creating a new 
one. This prevents
+        // connection leaks when a cancelled task's reader is still active 
while a new task arrives.
         if (this.streamReader != null) {
             LOG.info(
                     "Closing previous stream reader before creating new one 
for job {}",
@@ -420,6 +449,10 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
             this.streamReader = null;
         }
 
+        // Rebuild path: fail loudly if the source position is gone (e.g. slot 
dropped) instead of
+        // silently re-locating from a lost offset.
+        validateStreamSource(offsetMeta, baseReq);
+
         this.streamReader = getBinlogSplitReader(baseReq);
 
         LOG.info("Prepare stream split: {}", this.streamSplit.toString());
@@ -443,6 +476,10 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
         return result;
     }
 
+    // Source-specific check before (re)building the stream reader; default 
no-op.
+    protected void validateStreamSource(
+            Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq) 
throws Exception {}
+
     @Override
     public Iterator<SourceRecord> pollRecords() throws Exception {
         if (!snapshotReaderContexts.isEmpty()) {
@@ -550,7 +587,7 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
                                 }
                                 return null;
                             },
-                            pollExecutor);
+                            snapshotPollExecutor);
 
             activePollFutures.add(future);
         }
@@ -965,12 +1002,20 @@ public abstract class JdbcIncrementalSourceReader 
extends AbstractCdcSourceReade
     public synchronized void close(JobBaseConfig jobConfig) {
         LOG.info("Close source reader for job {}", jobConfig.getJobId());
         finishSplitRecords();
+        shutdownSnapshotPollExecutor();
         if (tableSchemas != null) {
             tableSchemas.clear();
             tableSchemas = null;
         }
     }
 
+    @Override
+    protected void shutdownSnapshotPollExecutor() {
+        if (snapshotPollExecutor != null) {
+            snapshotPollExecutor.shutdownNow();
+        }
+    }
+
     @Override
     public DeserializeResult deserialize(Map<String, String> config, 
SourceRecord element)
             throws IOException {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 72a8338580d..efea979a62c 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -127,7 +127,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
     private Set<String> completedSplitIds = ConcurrentHashMap.newKeySet();
 
     // Parallel polling support
-    private ExecutorService pollExecutor;
+    private ExecutorService snapshotPollExecutor;
     private volatile List<CompletableFuture<PollResult>> activePollFutures;
 
     // Binlog reader (single reader for binlog split)
@@ -149,7 +149,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
                         config.getOrDefault(
                                 DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
                                 
DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
-        this.pollExecutor =
+        this.snapshotPollExecutor =
                 Executors.newFixedThreadPool(
                         parallelism,
                         r -> {
@@ -404,7 +404,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
                                         split.splitId(),
                                         split.getTableId().identifier());
                             },
-                            pollExecutor));
+                            snapshotPollExecutor));
         }
 
         CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).join();
@@ -434,7 +434,37 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
         // Load tableSchemas from FE if available (avoids re-discover on 
restart)
         tryLoadTableSchemasFromRequest(baseReq);
         Tuple2<MySqlSplit, Boolean> splitFlag = createBinlogSplit(offsetMeta, 
baseReq);
-        this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0;
+        MySqlBinlogSplit newBinlogSplit = (MySqlBinlogSplit) splitFlag.f0;
+
+        // offset guard: reuse the live binlog reader only when the request 
start offset equals the
+        // reader's real consumed position, so steady-state rounds skip 
reconnect + binlog
+        // re-position.
+        if (this.binlogReader != null && this.binlogSplitState != null) {
+            BinlogOffset requestStart = newBinlogSplit.getStartingOffset();
+            BinlogOffset readerPos = this.binlogSplitState.getStartingOffset();
+            if (requestStart != null
+                    && readerPos != null
+                    && requestStart.compareTo(readerPos) == 0) {
+                LOG.info(
+                        "Reuse live binlog reader for job {} at offset {}",
+                        baseReq.getJobId(),
+                        requestStart);
+                this.binlogSplit = newBinlogSplit;
+                SplitReadResult reuseResult = new SplitReadResult();
+                
reuseResult.setSplits(Collections.singletonList(this.binlogSplit));
+                Map<String, Object> reuseStates = new HashMap<>();
+                reuseStates.put(this.binlogSplit.splitId(), 
this.binlogSplitState);
+                reuseResult.setSplitStates(reuseStates);
+                return reuseResult;
+            }
+            LOG.info(
+                    "Rebuild binlog reader for job {}: requestStart={}, 
readerPos={}",
+                    baseReq.getJobId(),
+                    requestStart,
+                    readerPos);
+        }
+
+        this.binlogSplit = newBinlogSplit;
 
         // Close previous binlog reader to release resources before creating a 
new one.
         // This prevents connection leaks when a cancelled task's reader is 
still active
@@ -568,7 +598,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
                                 }
                                 return null;
                             },
-                            pollExecutor);
+                            snapshotPollExecutor);
 
             activePollFutures.add(future);
         }
@@ -1195,12 +1225,20 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
     public synchronized void close(JobBaseConfig jobConfig) {
         LOG.info("Close source reader for job {}", jobConfig.getJobId());
         finishSplitRecords();
+        shutdownSnapshotPollExecutor();
         if (tableSchemas != null) {
             tableSchemas.clear();
             tableSchemas = null;
         }
     }
 
+    @Override
+    protected void shutdownSnapshotPollExecutor() {
+        if (snapshotPollExecutor != null) {
+            snapshotPollExecutor.shutdownNow();
+        }
+    }
+
     @Override
     public DeserializeResult deserialize(Map<String, String> config, 
SourceRecord element)
             throws IOException {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 027a4c7af94..fe37d5c392f 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -27,6 +27,7 @@ import org.apache.doris.cdcclient.utils.SmallFileMgr;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 import org.apache.doris.job.cdc.request.CompareOffsetRequest;
 import org.apache.doris.job.cdc.request.JobBaseConfig;
+import org.apache.doris.job.cdc.request.JobBaseRecordRequest;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -73,6 +74,7 @@ import 
io.debezium.connector.postgresql.PostgresConnectorConfig;
 import io.debezium.connector.postgresql.PostgresConnectorConfig.AutoCreateMode;
 import io.debezium.connector.postgresql.PostgresOffsetContext;
 import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.connection.Lsn;
 import io.debezium.connector.postgresql.connection.PostgresConnection;
 import 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
 import io.debezium.connector.postgresql.spi.SlotState;
@@ -491,6 +493,59 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         }
     }
 
+    // Detect a replication slot that was dropped (or dropped and recreated) 
out from under us while
+    // the job was paused/retrying. Recreating it silently would resume from a 
position whose WAL is
+    // already gone -> data loss. Fail with a fixed marker so FE classifies it 
as non-resumable.
+    @Override
+    protected void validateStreamSource(
+            Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq) 
throws Exception {
+        PostgresSourceConfig sourceConfig = getSourceConfig(baseReq);
+        PostgresDialect dialect = new PostgresDialect(sourceConfig);
+        try (PostgresConnection connection = dialect.openJdbcConnection()) {
+            SlotState slotState =
+                    connection.getReplicationSlotState(
+                            dialect.getSlotName(), dialect.getPluginName());
+            if (slotState == null) {
+                throw new CdcClientException(
+                        String.format(
+                                "Replication slot invalidated for job %s: slot 
%s not found on the"
+                                        + " upstream (dropped externally), 
cannot resume from the"
+                                        + " committed position without data 
loss.",
+                                baseReq.getJobId(), dialect.getSlotName()));
+            }
+            Lsn requestedLsn = extractRequestedLsn(offsetMeta);
+            Lsn restartLsn = slotState.slotRestartLsn();
+            // restart_lsn must stay <= committed position; a higher one means 
the slot was
+            // recreated
+            // and the WAL between them was discarded, so resuming would 
silently skip data.
+            if (requestedLsn != null
+                    && requestedLsn.asLong() > 0
+                    && restartLsn != null
+                    && restartLsn.compareTo(requestedLsn) > 0) {
+                throw new CdcClientException(
+                        String.format(
+                                "Replication slot invalidated for job %s: slot 
%s restart_lsn %s is"
+                                        + " ahead of the committed position %s 
(slot recreated),"
+                                        + " cannot resume without data loss.",
+                                baseReq.getJobId(),
+                                dialect.getSlotName(),
+                                restartLsn,
+                                requestedLsn));
+            }
+        }
+    }
+
+    private Lsn extractRequestedLsn(Map<String, Object> offsetMeta) {
+        if (offsetMeta == null || offsetMeta.get(SourceInfo.LSN_KEY) == null) {
+            return null;
+        }
+        try {
+            return 
Lsn.valueOf(Long.parseLong(String.valueOf(offsetMeta.get(SourceInfo.LSN_KEY))));
+        } catch (NumberFormatException ex) {
+            return null;
+        }
+    }
+
     @Override
     public int compareOffset(CompareOffsetRequest compareOffsetRequest) {
         Map<String, String> offsetFirst = 
compareOffsetRequest.getOffsetFirst();
@@ -606,6 +661,15 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
     @Override
     public void close(JobBaseConfig jobConfig) {
         super.close(jobConfig);
+        releaseSourceResources(jobConfig);
+    }
+
+    /**
+     * Drop the Doris-owned slot/publication. Returns false if either is still 
present (e.g. a dead
+     * BE's stale walsender holds the slot until PG reclaims it), so the 
caller can retry later.
+     */
+    @Override
+    public boolean releaseSourceResources(JobBaseConfig jobConfig) {
         Map<String, String> config = jobConfig.getConfig();
         String jobId = jobConfig.getJobId();
         String slotName = resolveSlotName(config, jobId);
@@ -618,29 +682,75 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
                     slotName,
                     pubName,
                     jobId);
-            return;
+            return true;
         }
-        try {
-            PostgresSourceConfig sourceConfig = getSourceConfig(jobConfig);
-            PostgresDialect dialect = new PostgresDialect(sourceConfig);
-            if (dropSlot) {
-                LOG.info("Dropping auto-created replication slot {} for job 
{}", slotName, jobId);
+        PostgresDialect dialect = new 
PostgresDialect(getSourceConfig(jobConfig));
+        boolean cleaned = true;
+        if (dropPub) {
+            LOG.info("Dropping auto-created publication {} for job {}", 
pubName, jobId);
+            try (PostgresConnection connection = dialect.openJdbcConnection()) 
{
+                connection.execute("DROP PUBLICATION IF EXISTS " + pubName);
+            } catch (Exception ex) {
+                LOG.warn(
+                        "Failed to drop publication {} for job {}: {}",
+                        pubName,
+                        jobId,
+                        ex.getMessage());
+            }
+            if (publicationExists(dialect, pubName)) {
+                LOG.warn(
+                        "Publication {} for job {} still present after drop, 
will retry",
+                        pubName,
+                        jobId);
+                cleaned = false;
+            }
+        }
+        if (dropSlot) {
+            LOG.info("Dropping auto-created replication slot {} for job {}", 
slotName, jobId);
+            try {
                 dialect.removeSlot(slotName);
-            } else {
-                LOG.info("Skipping drop of user-provided slot {} for job {}", 
slotName, jobId);
+            } catch (Exception ex) {
+                LOG.warn(
+                        "Drop of replication slot {} for job {} failed: {}",
+                        slotName,
+                        jobId,
+                        ex.getMessage());
             }
-            if (dropPub) {
-                LOG.info("Dropping auto-created publication {} for job {}", 
pubName, jobId);
-                try (PostgresConnection connection = 
dialect.openJdbcConnection()) {
-                    connection.execute("DROP PUBLICATION IF EXISTS " + 
pubName);
-                }
-            } else {
-                LOG.info(
-                        "Skipping drop of user-provided publication {} for job 
{}", pubName, jobId);
+            if (slotExists(dialect, slotName)) {
+                LOG.warn(
+                        "Replication slot {} for job {} still present after 
drop, will retry",
+                        slotName,
+                        jobId);
+                cleaned = false;
             }
+        }
+        return cleaned;
+    }
+
+    private boolean slotExists(PostgresDialect dialect, String slotName) {
+        try (PostgresConnection connection = dialect.openJdbcConnection()) {
+            return connection.queryAndMap(
+                    "SELECT 1 FROM pg_replication_slots WHERE slot_name = '" + 
slotName + "'",
+                    rs -> rs.next());
         } catch (Exception ex) {
+            // Can't verify -> treat as present so the bounded retry keeps 
trying instead of
+            // leaking.
             LOG.warn(
-                    "Failed to clean up postgres resources for job {}: {}", 
jobId, ex.getMessage());
+                    "Failed to check replication slot {} existence: {}", 
slotName, ex.getMessage());
+            return true;
+        }
+    }
+
+    private boolean publicationExists(PostgresDialect dialect, String pubName) 
{
+        try (PostgresConnection connection = dialect.openJdbcConnection()) {
+            return connection.queryAndMap(
+                    "SELECT 1 FROM pg_publication WHERE pubname = '" + pubName 
+ "'",
+                    rs -> rs.next());
+        } catch (Exception ex) {
+            // Can't verify -> treat as present so the bounded retry keeps 
trying instead of
+            // leaking.
+            LOG.warn("Failed to check publication {} existence: {}", pubName, 
ex.getMessage());
+            return true;
         }
     }
 }
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_dropped_during_incremental.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_dropped_during_incremental.groovy
new file mode 100644
index 00000000000..bbca6e22830
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_dropped_during_incremental.groovy
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// When the replication slot is dropped out from under a running incremental 
job,
+// resuming from the committed position would silently skip the WAL discarded 
with
+// the slot. On rebuild, validateStreamSource detects the missing slot and 
fails with
+// the "Replication slot invalidated" marker; FE classifies that as 
CANNOT_RESUME_ERR,
+// so the job settles in PAUSED and is NOT pulled back to RUNNING by 
auto-resume.
+//
+// Uses a user-provided slot so (1) the slot name is known up front and (2) 
Doris does
+// not auto-recreate it (createSlotForGlobalStreamSplit only fires for 
Doris-owned
+// slots), keeping the "slot not found" branch deterministic.
+//
+// We assert state + error marker only — NOT a hard slot-count check, which is
+// TOCTOU-flaky against the cdc_client winding down its connection.
+suite("test_streaming_postgres_job_slot_dropped_during_incremental",
+        "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_pg_slot_dropped_job"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "slot_dropped_pg_tbl"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+    def userSlot = "slot_dropped_user_slot"
+    def userPub = "slot_dropped_user_pub"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                  "id" int PRIMARY KEY,
+                  "name" varchar(200)
+                )"""
+            sql """DROP PUBLICATION IF EXISTS ${userPub}"""
+            sql """CREATE PUBLICATION ${userPub} FOR TABLE 
${pgDB}.${pgSchema}.${table1}"""
+            def existing = sql """SELECT COUNT(1) FROM pg_replication_slots 
WHERE slot_name = '${userSlot}'"""
+            if (existing[0][0] != 0) {
+                sql """SELECT pg_drop_replication_slot('${userSlot}')"""
+            }
+            sql """SELECT pg_create_logical_replication_slot('${userSlot}', 
'pgoutput')"""
+        }
+
+        // offset=latest skips snapshot and goes straight to streaming; 
max_interval=3
+        // keeps tasks short so the rebuild after the drop happens quickly.
+        sql """CREATE JOB ${jobName}
+                PROPERTIES ("max_interval" = "3")
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "slot_name" = "${userSlot}",
+                    "publication_name" = "${userPub}",
+                    "offset" = "latest"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        Awaitility.await().atMost(120, SECONDS).pollInterval(1, 
SECONDS).until({
+            def st = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+            st.size() == 1 && st.get(0).get(0) == "RUNNING"
+        })
+
+        // ===== Phase 1: confirm steady-state incremental sync =====
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            for (int i = 1; i <= 10; i++) {
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (${i}, 
'name_${i}')"""
+            }
+        }
+        Awaitility.await().atMost(180, SECONDS).pollInterval(2, 
SECONDS).until({
+            def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}"""
+            cnt.size() == 1 && cnt.get(0).get(0) >= 10
+        })
+
+        // ===== Phase 2: drop the slot out from under the running job =====
+        // Terminate whatever consumer holds the slot, then drop it. Retry 
until gone:
+        // an auto-resume task may briefly re-acquire it before we manage to 
drop.
+        Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({
+            boolean dropped = false
+            connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+                sql """SELECT pg_terminate_backend(active_pid) FROM 
pg_replication_slots
+                       WHERE slot_name = '${userSlot}' AND active_pid IS NOT 
NULL"""
+                def cnt = sql """SELECT COUNT(1) FROM pg_replication_slots 
WHERE slot_name = '${userSlot}'"""
+                if (cnt[0][0] == 0) {
+                    dropped = true
+                } else {
+                    try {
+                        sql """SELECT 
pg_drop_replication_slot('${userSlot}')"""
+                        dropped = true
+                    } catch (Exception e) {
+                        log.info("slot still active, retry drop: " + 
e.getMessage())
+                    }
+                }
+            }
+            dropped
+        })
+
+        // Generate WAL so the rebuilt reader is forced to locate the 
now-missing slot.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            for (int i = 100; i < 110; i++) {
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (${i}, 
'after_drop_${i}')"""
+            }
+        }
+
+        // ===== Phase 3: job settles in PAUSED with the slot-invalidated 
marker =====
+        Awaitility.await().atMost(240, SECONDS).pollInterval(3, 
SECONDS).until({
+            def r = sql """select status, ErrorMsg from jobs("type"="insert") 
where Name='${jobName}'"""
+            if (r.size() != 1) {
+                return false
+            }
+            def status = r.get(0).get(0)
+            def errMsg = r.get(0).get(1)
+            log.info("waiting slot-invalidated PAUSED: status=${status} 
errMsg=${errMsg}")
+            status == "PAUSED" && errMsg != null && 
errMsg.toString().contains("Replication slot invalidated")
+        })
+
+        // ===== Phase 4: CANNOT_RESUME — must stay PAUSED, not auto-resumed 
to RUNNING =====
+        for (int i = 0; i < 8; i++) {
+            sleep(5000)
+            def r = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+            assert r.size() == 1 && r.get(0).get(0) == "PAUSED" :
+                    "job must stay PAUSED after slot invalidation, got ${r}"
+        }
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            def slotLeft = sql """SELECT COUNT(1) FROM pg_replication_slots 
WHERE slot_name = '${userSlot}'"""
+            if (slotLeft[0][0] != 0) {
+                sql """SELECT pg_drop_replication_slot('${userSlot}')"""
+            }
+            sql """DROP PUBLICATION IF EXISTS ${userPub}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+        }
+        sql """drop table if exists ${currentDb}.${table1} force"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to