This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 2eeb5d204c1 branch-4.1: [improve](streaming-job) add from-to cdc 
WAL-search timeout and stale-reader release #64013 (#64112)
2eeb5d204c1 is described below

commit 2eeb5d204c19d5b2b6d7d736578603ddf7363cc8
Author: wudi <[email protected]>
AuthorDate: Tue Jun 9 11:12:44 2026 +0800

    branch-4.1: [improve](streaming-job) add from-to cdc WAL-search timeout and 
stale-reader release #64013 (#64112)
    
    Cherry-picked from #64013
---
 .../doris/job/cdc/request/WriteRecordRequest.java  |  1 +
 .../insert/streaming/StreamingMultiTblTask.java    | 54 ++++++++++++++++--
 .../org/apache/doris/cdcclient/common/Env.java     | 65 ++++++++++++++++++++++
 .../cdcclient/controller/ClientController.java     | 21 +++++++
 .../cdcclient/service/PipelineCoordinator.java     | 23 +++++++-
 .../doris/cdcclient/sink/DorisBatchStreamLoad.java | 15 +++--
 .../apache/doris/cdcclient/sink/RespContent.java   |  7 +++
 .../source/reader/AbstractCdcSourceReader.java     |  8 +++
 .../cdcclient/source/reader/SourceReader.java      |  5 ++
 .../org/apache/doris/cdcclient/common/EnvTest.java | 28 ++++++----
 .../doris/cdcclient/sink/RespContentTest.java      | 55 ++++++++++++++++++
 .../source/reader/AbstractCdcSourceReaderTest.java | 13 +++++
 .../cdc/test_streaming_mysql_job_errormsg.groovy   |  2 +
 13 files changed, 274 insertions(+), 23 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
index fd56518643d..511e4fcea74 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
@@ -26,6 +26,7 @@ import java.util.Map;
 @EqualsAndHashCode(callSuper = true)
 public class WriteRecordRequest extends JobBaseRecordRequest {
     private long maxInterval;
+    private long taskTimeoutMs;
     private String targetDb;
     private String token;
     private String taskId;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index c257613749d..9077ad01a82 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -26,6 +26,7 @@ import org.apache.doris.httpv2.rest.RestApiStatusCode;
 import org.apache.doris.job.base.Job;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 import org.apache.doris.job.cdc.request.CommitOffsetRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
 import org.apache.doris.job.cdc.request.WriteRecordRequest;
 import org.apache.doris.job.cdc.split.BinlogSplit;
 import org.apache.doris.job.cdc.split.SnapshotSplit;
@@ -208,6 +209,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         String feAddr = Env.getCurrentEnv().getMasterHost() + ":" + 
Env.getCurrentEnv().getMasterHttpPort();
         request.setFrontendAddress(feAddr);
         request.setMaxInterval(jobProperties.getMaxIntervalSecond());
+        request.setTaskTimeoutMs(getTaskTimeoutMs());
         if (offsetProvider instanceof JdbcSourceOffsetProvider) {
             String schemas = ((JdbcSourceOffsetProvider) 
offsetProvider).getTableSchemas();
             if (schemas != null) {
@@ -306,18 +308,57 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
 
     @Override
     protected void onFail(String errMsg) throws JobException {
+        // Release this task's reader before reschedule so it stops competing 
for the shared slot.
+        releaseRemoteReader();
         super.onFail(errMsg);
     }
 
     @Override
     public void cancel(boolean needWaitCancelComplete) {
-        // No manual cancellation is required; the task ID will be checked for 
consistency in the beforeCommit function.
+        // No release here: DROP/STOP/PAUSE clean up via /api/close; releasing 
would orphan the engine.
         super.cancel(needWaitCancelComplete);
     }
 
     @Override
     public void closeOrReleaseResources() {
-        // no need
+        // No-op: reader is shared across tasks; release on reschedule is done 
in onFail().
+    }
+
+    /**
+     * Best-effort, onFail reschedule only: stop this job's reader on {@link 
#runningBackendId} so a
+     * reschedule to another backend never leaves two readers competing for 
the same source (e.g. one
+     * PG replication slot, which is kept, not dropped). Failures are 
swallowed and must not block
+     * rescheduling.
+     */
+    public void releaseRemoteReader() {
+        if (runningBackendId <= 0) {
+            return;
+        }
+        Backend backend = 
Env.getCurrentSystemInfo().getBackend(runningBackendId);
+        if (backend == null) {
+            log.info("Skip releasing remote reader: backend {} not found, job 
{} task {}",
+                    runningBackendId, getJobId(), getTaskId());
+            return;
+        }
+        try {
+            JobBaseConfig releaseParams = new JobBaseConfig(
+                    String.valueOf(getJobId()), dataSourceType.name(), 
sourceProperties, getFrontendAddress());
+            InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
+                    .setApi("/api/releaseReader/" + getTaskId())
+                    .setParams(new Gson().toJson(releaseParams)).build();
+            TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
+            // Fire-and-forget: this runs under the job write lock, so never 
block on the result.
+            BackendServiceProxy.getInstance()
+                    .requestCdcClient(address, request, 
Config.streaming_cdc_light_rpc_timeout_sec);
+            log.info("Sent release reader request to backend {}:{} for job {} 
task {}",
+                    backend.getHost(), backend.getBrpcPort(), getJobId(), 
getTaskId());
+        } catch (Exception ex) {
+            log.warn("Release remote reader request failed for job {} task {}: 
", getJobId(), getTaskId(), ex);
+        }
+    }
+
+    private String getFrontendAddress() {
+        return Env.getCurrentEnv().getMasterHost() + ":" + 
Env.getCurrentEnv().getMasterHttpPort();
     }
 
     public boolean isTimeout() {
@@ -325,9 +366,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
             // It's still pending, waiting for scheduling.
             return false;
         }
-        // Read multiplier live so config changes affect already-running tasks.
-        long timeoutMs = Config.streaming_task_timeout_multiplier
-                * jobProperties.getMaxIntervalSecond() * 1000L;
+        long timeoutMs = getTaskTimeoutMs();
         long elapsed = System.currentTimeMillis() - startTimeMs;
         if (elapsed > timeoutMs) {
             log.info("Task {} timeout detected: elapsed={}ms, timeoutMs={}ms", 
taskId, elapsed, timeoutMs);
@@ -336,6 +375,11 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         return false;
     }
 
+    // Read multiplier live so config changes affect already-running tasks.
+    private long getTaskTimeoutMs() {
+        return Config.streaming_task_timeout_multiplier * 
jobProperties.getMaxIntervalSecond() * 1000L;
+    }
+
     /**
      * When a task encounters a write error, it will time out.
      * The job needs to obtain the reason for the timeout,
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
index 79d5c65cf57..28da598053b 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
@@ -73,6 +73,70 @@ public class Env {
         return manager.getOrCreateReader(jobConfig.getJobId(), ds, 
jobConfig.getConfig());
     }
 
+    /** Return the reader only if already created, else null (never creates 
one). */
+    public SourceReader getReaderIfPresent(String jobId) {
+        JobContext context = jobContexts.get(jobId);
+        return context == null ? null : context.reader;
+    }
+
+    /**
+     * Get-or-create this job's reader and claim ownership for {@code taskId} 
atomically under the
+     * per-job lock, so a concurrent stale release cannot stop a reader this 
task is about to use.
+     */
+    public SourceReader getReaderAndClaim(JobBaseConfig jobConfig, String 
taskId) {
+        if (jobConfig.getFrontendAddress() != null && 
!jobConfig.getFrontendAddress().isEmpty()) {
+            this.feMasterAddress = jobConfig.getFrontendAddress();
+        }
+        DataSource ds = resolveDataSource(jobConfig.getDataSource());
+        String jobId = jobConfig.getJobId();
+        Lock lock = jobLocks.computeIfAbsent(jobId, k -> new ReentrantLock());
+        lock.lock();
+        try {
+            JobContext context = jobContexts.get(jobId);
+            if (context == null) {
+                LOG.info("Creating new reader for job {}, dataSource {}", 
jobId, ds);
+                context = new JobContext(jobId, ds, jobConfig.getConfig());
+                context.initializeReader();
+                jobContexts.put(jobId, context);
+            }
+            context.ownerTaskId = taskId;
+            return context.getReader(ds);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * If {@code taskId} still owns the reader, remove the context under the 
per-job lock and return
+     * the reader to release; else null (stale release -> no-op). Removing 
under the lock guarantees
+     * a racing {@link #getReaderAndClaim} either sees the new owner (no-op) 
or rebuilds a fresh
+     * one.
+     */
+    public SourceReader detachReaderIfOwner(String jobId, String taskId) {
+        Lock lock = jobLocks.get(jobId);
+        if (lock == null) {
+            return null;
+        }
+        lock.lock();
+        try {
+            JobContext context = jobContexts.get(jobId);
+            if (context == null || !Objects.equals(context.ownerTaskId, 
taskId)) {
+                if (context != null) {
+                    LOG.info(
+                            "Stale release for job {} task {} (owner {}), 
skip",
+                            jobId,
+                            taskId,
+                            context.ownerTaskId);
+                }
+                return null;
+            }
+            jobContexts.remove(jobId);
+            return context.reader;
+        } finally {
+            lock.unlock();
+        }
+    }
+
     private DataSource resolveDataSource(String source) {
         if (source == null || source.trim().isEmpty()) {
             throw new IllegalArgumentException("Missing dataSource");
@@ -131,6 +195,7 @@ public class Env {
     private static final class JobContext {
         private final String jobId;
         private volatile SourceReader reader;
+        private volatile String ownerTaskId;
         private volatile Map<String, String> config;
         private volatile DataSource dataSource;
 
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 0dd3367abe4..22509a55e98 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -138,6 +138,27 @@ public class ClientController {
         return RestResponse.success(true);
     }
 
+    /** Release a job's reader on this backend: stop engine, keep the 
replication slot. */
+    @RequestMapping(path = "/api/releaseReader/{taskId}", method = 
RequestMethod.POST)
+    public Object releaseReader(
+            @PathVariable("taskId") String taskId, @RequestBody JobBaseConfig 
jobConfig) {
+        LOG.info("Releasing reader (keep slot) for job {} task {}", 
jobConfig.getJobId(), taskId);
+        Env env = Env.getCurrentEnv();
+        // Only the owning task may release; detach removes the context under 
the per-job lock so a
+        // racing claim rebuilds a fresh reader, and a stale RPC is a no-op.
+        SourceReader reader = env.detachReaderIfOwner(jobConfig.getJobId(), 
taskId);
+        if (reader == null) {
+            LOG.info(
+                    "No owned reader for job {} task {}, skip release",
+                    jobConfig.getJobId(),
+                    taskId);
+            return RestResponse.success(true);
+        }
+        // Upstream-only: stop engine, keep slot. Loader is job-scoped, 
cleaned up by /api/close.
+        reader.release(jobConfig);
+        return RestResponse.success(true);
+    }
+
     /** get task fail reason */
     @RequestMapping(path = "/api/getFailReason/{taskId}", method = 
RequestMethod.POST)
     public Object getFailReason(@PathVariable("taskId") String taskId) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index d33f48f38f8..ebbd9c4acf1 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -443,7 +443,11 @@ public class PipelineCoordinator {
         Map<String, String> targetTableMappings =
                 
ConfigUtil.parseAllTargetTableMappings(writeRecordRequest.getConfig());
 
-        SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
+        // Get-or-create the reader and claim ownership atomically, so a 
concurrent stale
+        // releaseReader RPC cannot stop the reader this task is about to use.
+        SourceReader sourceReader =
+                Env.getCurrentEnv()
+                        .getReaderAndClaim(writeRecordRequest, 
writeRecordRequest.getTaskId());
         DorisBatchStreamLoad batchStreamLoad = null;
         long scannedRows = 0L;
         int heartbeatCount = 0;
@@ -459,6 +463,8 @@ public class PipelineCoordinator {
             long startTime = System.currentTimeMillis();
             long streamingStartTime = -1;
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
+            // Half the FE task timeout; exit setup phase before FE 
force-kills. 0 disables.
+            long searchTimeoutMs = writeRecordRequest.getTaskTimeoutMs() / 2;
             boolean shouldStop = false;
             boolean lastMessageIsHeartbeat = false;
 
@@ -476,6 +482,21 @@ public class PipelineCoordinator {
                 if (!recordIterator.hasNext()) {
                     Thread.sleep(100);
 
+                    // Stream-split setup stuck (WAL search / idle): bail out; 
snapshot has its own
+                    // completion logic.
+                    if (!isSnapshotSplit
+                            && streamingStartTime < 0
+                            && searchTimeoutMs > 0
+                            && System.currentTimeMillis() - startTime > 
searchTimeoutMs) {
+                        LOG.warn(
+                                "Streaming not started within {} ms for 
jobId={} taskId={}, "
+                                        + "stopping to commit offset",
+                                searchTimeoutMs,
+                                writeRecordRequest.getJobId(),
+                                writeRecordRequest.getTaskId());
+                        break;
+                    }
+
                     // Check if should stop
                     long elapsedTime =
                             streamingStartTime > 0
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index 81e787b13ea..b1d1cd8ba03 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -444,11 +444,16 @@ public class DorisBatchStreamLoad implements Serializable 
{
                                                     loadResult);
                                     throw new StreamLoadException(errMsg);
                                 } else {
-                                    errMsg =
-                                            String.format(
-                                                    "stream load error: %s, 
see more in %s",
-                                                    respContent.getMessage(),
-                                                    respContent.getErrorURL());
+                                    // Carry FirstErrorMsg (the first rejected 
row detail) so the
+                                    // task error surfaced to FE is 
actionable, not just an URL.
+                                    StringBuilder msg = new 
StringBuilder("stream load error: ");
+                                    msg.append(respContent.getMessage());
+                                    if 
(StringUtils.isNotBlank(respContent.getFirstErrorMsg())) {
+                                        msg.append(", first_error_msg: ")
+                                                
.append(respContent.getFirstErrorMsg());
+                                    }
+                                    msg.append(", see more in 
").append(respContent.getErrorURL());
+                                    errMsg = msg.toString();
                                 }
                                 throw new StreamLoadException(errMsg);
                             }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/RespContent.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/RespContent.java
index 35327aa4553..b5de06f9621 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/RespContent.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/RespContent.java
@@ -81,6 +81,9 @@ public class RespContent {
     @JsonProperty(value = "ErrorURL")
     private String errorURL;
 
+    @JsonProperty(value = "FirstErrorMsg")
+    private String firstErrorMsg;
+
     public Long getTxnId() {
         return txnId;
     }
@@ -167,4 +170,8 @@ public class RespContent {
     public String getErrorURL() {
         return errorURL;
     }
+
+    public String getFirstErrorMsg() {
+        return firstErrorMsg;
+    }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
index 115e617af98..74eb534bc5f 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -68,6 +68,14 @@ public abstract class AbstractCdcSourceReader implements 
SourceReader {
 
     private final Map<String, Class<?>> splitKeyClassCache = new 
ConcurrentHashMap<>();
 
+    @Override
+    public synchronized void release(JobBaseConfig jobConfig) {
+        // Stop the engine but keep source-side state (e.g. the PG replication 
slot) for another
+        // backend to take over.
+        LOG.info("Release source reader for job {}", jobConfig.getJobId());
+        finishSplitRecords();
+    }
+
     protected abstract Class<?> probeSplitKeyClass(
             TableId tableId, Column splitColumn, JobBaseConfig jobConfig);
 
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
index b41f66f89fb..c51572b377d 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
@@ -79,6 +79,11 @@ public interface SourceReader {
     /** Called when closing */
     void close(JobBaseConfig jobConfig);
 
+    /**
+     * Stop the reader engine and free its replication-slot connection, but 
keep the slot itself.
+     */
+    void release(JobBaseConfig jobConfig);
+
     DeserializeResult deserialize(Map<String, String> config, SourceRecord 
element)
             throws IOException;
 
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/common/EnvTest.java
similarity index 55%
copy from 
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
copy to 
fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/common/EnvTest.java
index fd56518643d..685beb29f8a 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/common/EnvTest.java
@@ -15,19 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.job.cdc.request;
+package org.apache.doris.cdcclient.common;
 
-import lombok.Data;
-import lombok.EqualsAndHashCode;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
-import java.util.Map;
+import org.junit.jupiter.api.Test;
 
-@Data
-@EqualsAndHashCode(callSuper = true)
-public class WriteRecordRequest extends JobBaseRecordRequest {
-    private long maxInterval;
-    private String targetDb;
-    private String token;
-    private String taskId;
-    private Map<String, String> streamLoadProps;
+class EnvTest {
+
+    @Test
+    void getReaderIfPresentReturnsNullForUnknownJob() {
+        // An off-target releaseReader RPC must be a no-op, never create a 
reader -> peek returns null.
+        assertNull(Env.getCurrentEnv().getReaderIfPresent("no-such-job-id"));
+    }
+
+    @Test
+    void detachReaderIfOwnerReturnsNullForUnknownJob() {
+        // Stale release for an unknown job (no lock/context) must be a no-op.
+        assertNull(Env.getCurrentEnv().detachReaderIfOwner("no-such-job-id", 
"t1"));
+    }
 }
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/sink/RespContentTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/sink/RespContentTest.java
new file mode 100644
index 00000000000..5765fbc786f
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/sink/RespContentTest.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.sink;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+class RespContentTest {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    @Test
+    void parsesFirstErrorMsgFromFailedLoadResponse() throws Exception {
+        String json =
+                "{\"Status\":\"Fail\","
+                        + "\"Message\":\"[DATA_QUALITY_ERROR]Encountered 
unqualified data.\","
+                        + "\"ErrorURL\":\"https://host/error_log/abc\",";
+                        + "\"FirstErrorMsg\":\"column(event_qty) values is 
null while columns is"
+                        + " not nullable.\"}";
+
+        RespContent resp = MAPPER.readValue(json, RespContent.class);
+
+        assertEquals("Fail", resp.getStatus());
+        assertEquals(
+                "column(event_qty) values is null while columns is not 
nullable.",
+                resp.getFirstErrorMsg());
+    }
+
+    @Test
+    void firstErrorMsgIsNullWhenAbsent() throws Exception {
+        // Success responses omit FirstErrorMsg; must not blow up 
deserialization.
+        RespContent resp =
+                MAPPER.readValue("{\"Status\":\"Success\"}", 
RespContent.class);
+        assertNull(resp.getFirstErrorMsg());
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
index fbca1ad41e6..af1ffae2902 100644
--- 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.cdcclient.source.reader;
 
+import org.apache.doris.cdcclient.source.reader.mysql.MySqlSourceReader;
+import org.apache.doris.cdcclient.source.reader.postgres.PostgresSourceReader;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
 import org.apache.doris.job.cdc.split.SnapshotSplit;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -25,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import org.junit.jupiter.api.Test;
 
+import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
@@ -38,6 +42,15 @@ class AbstractCdcSourceReaderTest {
 
     private static final ObjectMapper MAPPER = new ObjectMapper();
 
+    @Test
+    void releaseStaysBaseImplSoReplicationSlotIsKept() throws Exception {
+        // release must stay the base impl (close drops the slot, release must 
not) so a reschedule keeps the slot.
+        Method pgRelease = PostgresSourceReader.class.getMethod("release", 
JobBaseConfig.class);
+        assertEquals(AbstractCdcSourceReader.class, 
pgRelease.getDeclaringClass());
+        Method mysqlRelease = MySqlSourceReader.class.getMethod("release", 
JobBaseConfig.class);
+        assertEquals(AbstractCdcSourceReader.class, 
mysqlRelease.getDeclaringClass());
+    }
+
     @Test
     void convertBoundsRestoresDateFromString() {
         Object[] raw = new Object[] {"2025-01-06"};
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
index a23e5efb0bf..e3307d19d65 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
@@ -102,6 +102,8 @@ suite("test_streaming_mysql_job_errormsg", 
"p0,external,mysql,external_docker,ex
         log.info("jobFailMsg: " + jobFailMsg)
         // stream load error: [DATA_QUALITY_ERROR]too many filtered rows
         assert jobFailMsg.get(0).get(0).contains("stream load error")
+        // FirstErrorMsg (first rejected row detail) must be surfaced to the 
user, not just an URL
+        assert jobFailMsg.get(0).get(0).contains("first_error_msg")
 
 
         // add max_filter_ratio to 1


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to