This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 2eeb5d204c1 branch-4.1: [improve](streaming-job) add from-to cdc
WAL-search timeout and stale-reader release #64013 (#64112)
2eeb5d204c1 is described below
commit 2eeb5d204c19d5b2b6d7d736578603ddf7363cc8
Author: wudi <[email protected]>
AuthorDate: Tue Jun 9 11:12:44 2026 +0800
branch-4.1: [improve](streaming-job) add from-to cdc WAL-search timeout and
stale-reader release #64013 (#64112)
Cherry-picked from #64013
---
.../doris/job/cdc/request/WriteRecordRequest.java | 1 +
.../insert/streaming/StreamingMultiTblTask.java | 54 ++++++++++++++++--
.../org/apache/doris/cdcclient/common/Env.java | 65 ++++++++++++++++++++++
.../cdcclient/controller/ClientController.java | 21 +++++++
.../cdcclient/service/PipelineCoordinator.java | 23 +++++++-
.../doris/cdcclient/sink/DorisBatchStreamLoad.java | 15 +++--
.../apache/doris/cdcclient/sink/RespContent.java | 7 +++
.../source/reader/AbstractCdcSourceReader.java | 8 +++
.../cdcclient/source/reader/SourceReader.java | 5 ++
.../org/apache/doris/cdcclient/common/EnvTest.java | 28 ++++++----
.../doris/cdcclient/sink/RespContentTest.java | 55 ++++++++++++++++++
.../source/reader/AbstractCdcSourceReaderTest.java | 13 +++++
.../cdc/test_streaming_mysql_job_errormsg.groovy | 2 +
13 files changed, 274 insertions(+), 23 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
index fd56518643d..511e4fcea74 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
@@ -26,6 +26,7 @@ import java.util.Map;
@EqualsAndHashCode(callSuper = true)
public class WriteRecordRequest extends JobBaseRecordRequest {
private long maxInterval;
+ private long taskTimeoutMs;
private String targetDb;
private String token;
private String taskId;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index c257613749d..9077ad01a82 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -26,6 +26,7 @@ import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.job.base.Job;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.CommitOffsetRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
import org.apache.doris.job.cdc.split.BinlogSplit;
import org.apache.doris.job.cdc.split.SnapshotSplit;
@@ -208,6 +209,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
String feAddr = Env.getCurrentEnv().getMasterHost() + ":" +
Env.getCurrentEnv().getMasterHttpPort();
request.setFrontendAddress(feAddr);
request.setMaxInterval(jobProperties.getMaxIntervalSecond());
+ request.setTaskTimeoutMs(getTaskTimeoutMs());
if (offsetProvider instanceof JdbcSourceOffsetProvider) {
String schemas = ((JdbcSourceOffsetProvider)
offsetProvider).getTableSchemas();
if (schemas != null) {
@@ -306,18 +308,57 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
@Override
protected void onFail(String errMsg) throws JobException {
+ // Release this task's reader before reschedule so it stops competing
for the shared slot.
+ releaseRemoteReader();
super.onFail(errMsg);
}
@Override
public void cancel(boolean needWaitCancelComplete) {
- // No manual cancellation is required; the task ID will be checked for
consistency in the beforeCommit function.
+ // No release here: DROP/STOP/PAUSE clean up via /api/close; releasing
would orphan the engine.
super.cancel(needWaitCancelComplete);
}
@Override
public void closeOrReleaseResources() {
- // no need
+ // No-op: reader is shared across tasks; release on reschedule is done
in onFail().
+ }
+
+ /**
+ * Best-effort, onFail reschedule only: stop this job's reader on {@link
#runningBackendId} so a
+ * reschedule to another backend never leaves two readers competing for
the same source (e.g. one
+ * PG replication slot, which is kept, not dropped). Failures are
swallowed and must not block
+ * rescheduling.
+ */
+ public void releaseRemoteReader() {
+ if (runningBackendId <= 0) {
+ return;
+ }
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(runningBackendId);
+ if (backend == null) {
+ log.info("Skip releasing remote reader: backend {} not found, job
{} task {}",
+ runningBackendId, getJobId(), getTaskId());
+ return;
+ }
+ try {
+ JobBaseConfig releaseParams = new JobBaseConfig(
+ String.valueOf(getJobId()), dataSourceType.name(),
sourceProperties, getFrontendAddress());
+ InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
+ .setApi("/api/releaseReader/" + getTaskId())
+ .setParams(new Gson().toJson(releaseParams)).build();
+ TNetworkAddress address = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
+ // Fire-and-forget: this runs under the job write lock, so never
block on the result.
+ BackendServiceProxy.getInstance()
+ .requestCdcClient(address, request,
Config.streaming_cdc_light_rpc_timeout_sec);
+ log.info("Sent release reader request to backend {}:{} for job {}
task {}",
+ backend.getHost(), backend.getBrpcPort(), getJobId(),
getTaskId());
+ } catch (Exception ex) {
+ log.warn("Release remote reader request failed for job {} task {}:
", getJobId(), getTaskId(), ex);
+ }
+ }
+
+ private String getFrontendAddress() {
+ return Env.getCurrentEnv().getMasterHost() + ":" +
Env.getCurrentEnv().getMasterHttpPort();
}
public boolean isTimeout() {
@@ -325,9 +366,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
// It's still pending, waiting for scheduling.
return false;
}
- // Read multiplier live so config changes affect already-running tasks.
- long timeoutMs = Config.streaming_task_timeout_multiplier
- * jobProperties.getMaxIntervalSecond() * 1000L;
+ long timeoutMs = getTaskTimeoutMs();
long elapsed = System.currentTimeMillis() - startTimeMs;
if (elapsed > timeoutMs) {
log.info("Task {} timeout detected: elapsed={}ms, timeoutMs={}ms",
taskId, elapsed, timeoutMs);
@@ -336,6 +375,11 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
return false;
}
+ // Read multiplier live so config changes affect already-running tasks.
+ private long getTaskTimeoutMs() {
+ return Config.streaming_task_timeout_multiplier *
jobProperties.getMaxIntervalSecond() * 1000L;
+ }
+
/**
* When a task encounters a write error, it will time out.
* The job needs to obtain the reason for the timeout,
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
index 79d5c65cf57..28da598053b 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
@@ -73,6 +73,70 @@ public class Env {
return manager.getOrCreateReader(jobConfig.getJobId(), ds,
jobConfig.getConfig());
}
+ /** Return the reader only if already created, else null (never creates
one). */
+ public SourceReader getReaderIfPresent(String jobId) {
+ JobContext context = jobContexts.get(jobId);
+ return context == null ? null : context.reader;
+ }
+
+ /**
+ * Get-or-create this job's reader and claim ownership for {@code taskId}
atomically under the
+ * per-job lock, so a concurrent stale release cannot stop a reader this
task is about to use.
+ */
+ public SourceReader getReaderAndClaim(JobBaseConfig jobConfig, String
taskId) {
+ if (jobConfig.getFrontendAddress() != null &&
!jobConfig.getFrontendAddress().isEmpty()) {
+ this.feMasterAddress = jobConfig.getFrontendAddress();
+ }
+ DataSource ds = resolveDataSource(jobConfig.getDataSource());
+ String jobId = jobConfig.getJobId();
+ Lock lock = jobLocks.computeIfAbsent(jobId, k -> new ReentrantLock());
+ lock.lock();
+ try {
+ JobContext context = jobContexts.get(jobId);
+ if (context == null) {
+ LOG.info("Creating new reader for job {}, dataSource {}",
jobId, ds);
+ context = new JobContext(jobId, ds, jobConfig.getConfig());
+ context.initializeReader();
+ jobContexts.put(jobId, context);
+ }
+ context.ownerTaskId = taskId;
+ return context.getReader(ds);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * If {@code taskId} still owns the reader, remove the context under the
per-job lock and return
+ * the reader to release; else null (stale release -> no-op). Removing
under the lock guarantees
+ * a racing {@link #getReaderAndClaim} either sees the new owner (no-op)
or rebuilds a fresh
+ * one.
+ */
+ public SourceReader detachReaderIfOwner(String jobId, String taskId) {
+ Lock lock = jobLocks.get(jobId);
+ if (lock == null) {
+ return null;
+ }
+ lock.lock();
+ try {
+ JobContext context = jobContexts.get(jobId);
+ if (context == null || !Objects.equals(context.ownerTaskId,
taskId)) {
+ if (context != null) {
+ LOG.info(
+ "Stale release for job {} task {} (owner {}),
skip",
+ jobId,
+ taskId,
+ context.ownerTaskId);
+ }
+ return null;
+ }
+ jobContexts.remove(jobId);
+ return context.reader;
+ } finally {
+ lock.unlock();
+ }
+ }
+
private DataSource resolveDataSource(String source) {
if (source == null || source.trim().isEmpty()) {
throw new IllegalArgumentException("Missing dataSource");
@@ -131,6 +195,7 @@ public class Env {
private static final class JobContext {
private final String jobId;
private volatile SourceReader reader;
+ private volatile String ownerTaskId;
private volatile Map<String, String> config;
private volatile DataSource dataSource;
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 0dd3367abe4..22509a55e98 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -138,6 +138,27 @@ public class ClientController {
return RestResponse.success(true);
}
+ /** Release a job's reader on this backend: stop engine, keep the
replication slot. */
+ @RequestMapping(path = "/api/releaseReader/{taskId}", method =
RequestMethod.POST)
+ public Object releaseReader(
+ @PathVariable("taskId") String taskId, @RequestBody JobBaseConfig
jobConfig) {
+ LOG.info("Releasing reader (keep slot) for job {} task {}",
jobConfig.getJobId(), taskId);
+ Env env = Env.getCurrentEnv();
+ // Only the owning task may release; detach removes the context under
the per-job lock so a
+ // racing claim rebuilds a fresh reader, and a stale RPC is a no-op.
+ SourceReader reader = env.detachReaderIfOwner(jobConfig.getJobId(),
taskId);
+ if (reader == null) {
+ LOG.info(
+ "No owned reader for job {} task {}, skip release",
+ jobConfig.getJobId(),
+ taskId);
+ return RestResponse.success(true);
+ }
+ // Upstream-only: stop engine, keep slot. Loader is job-scoped,
cleaned up by /api/close.
+ reader.release(jobConfig);
+ return RestResponse.success(true);
+ }
+
/** get task fail reason */
@RequestMapping(path = "/api/getFailReason/{taskId}", method =
RequestMethod.POST)
public Object getFailReason(@PathVariable("taskId") String taskId) {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index d33f48f38f8..ebbd9c4acf1 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -443,7 +443,11 @@ public class PipelineCoordinator {
Map<String, String> targetTableMappings =
ConfigUtil.parseAllTargetTableMappings(writeRecordRequest.getConfig());
- SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
+ // Get-or-create the reader and claim ownership atomically, so a
concurrent stale
+ // releaseReader RPC cannot stop the reader this task is about to use.
+ SourceReader sourceReader =
+ Env.getCurrentEnv()
+ .getReaderAndClaim(writeRecordRequest,
writeRecordRequest.getTaskId());
DorisBatchStreamLoad batchStreamLoad = null;
long scannedRows = 0L;
int heartbeatCount = 0;
@@ -459,6 +463,8 @@ public class PipelineCoordinator {
long startTime = System.currentTimeMillis();
long streamingStartTime = -1;
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
+ // Half the FE task timeout; exit setup phase before FE
force-kills. 0 disables.
+ long searchTimeoutMs = writeRecordRequest.getTaskTimeoutMs() / 2;
boolean shouldStop = false;
boolean lastMessageIsHeartbeat = false;
@@ -476,6 +482,21 @@ public class PipelineCoordinator {
if (!recordIterator.hasNext()) {
Thread.sleep(100);
+ // Stream-split setup stuck (WAL search / idle): bail out;
snapshot has its own
+ // completion logic.
+ if (!isSnapshotSplit
+ && streamingStartTime < 0
+ && searchTimeoutMs > 0
+ && System.currentTimeMillis() - startTime >
searchTimeoutMs) {
+ LOG.warn(
+ "Streaming not started within {} ms for
jobId={} taskId={}, "
+ + "stopping to commit offset",
+ searchTimeoutMs,
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId());
+ break;
+ }
+
// Check if should stop
long elapsedTime =
streamingStartTime > 0
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index 81e787b13ea..b1d1cd8ba03 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -444,11 +444,16 @@ public class DorisBatchStreamLoad implements Serializable
{
loadResult);
throw new StreamLoadException(errMsg);
} else {
- errMsg =
- String.format(
- "stream load error: %s,
see more in %s",
- respContent.getMessage(),
- respContent.getErrorURL());
+ // Carry FirstErrorMsg (the first rejected
row detail) so the
+ // task error surfaced to FE is
actionable, not just an URL.
+ StringBuilder msg = new
StringBuilder("stream load error: ");
+ msg.append(respContent.getMessage());
+ if
(StringUtils.isNotBlank(respContent.getFirstErrorMsg())) {
+ msg.append(", first_error_msg: ")
+
.append(respContent.getFirstErrorMsg());
+ }
+ msg.append(", see more in
").append(respContent.getErrorURL());
+ errMsg = msg.toString();
}
throw new StreamLoadException(errMsg);
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/RespContent.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/RespContent.java
index 35327aa4553..b5de06f9621 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/RespContent.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/RespContent.java
@@ -81,6 +81,9 @@ public class RespContent {
@JsonProperty(value = "ErrorURL")
private String errorURL;
+ @JsonProperty(value = "FirstErrorMsg")
+ private String firstErrorMsg;
+
public Long getTxnId() {
return txnId;
}
@@ -167,4 +170,8 @@ public class RespContent {
public String getErrorURL() {
return errorURL;
}
+
+ public String getFirstErrorMsg() {
+ return firstErrorMsg;
+ }
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
index 115e617af98..74eb534bc5f 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -68,6 +68,14 @@ public abstract class AbstractCdcSourceReader implements
SourceReader {
private final Map<String, Class<?>> splitKeyClassCache = new
ConcurrentHashMap<>();
+ @Override
+ public synchronized void release(JobBaseConfig jobConfig) {
+ // Stop the engine but keep source-side state (e.g. the PG replication
slot) for another
+ // backend to take over.
+ LOG.info("Release source reader for job {}", jobConfig.getJobId());
+ finishSplitRecords();
+ }
+
protected abstract Class<?> probeSplitKeyClass(
TableId tableId, Column splitColumn, JobBaseConfig jobConfig);
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
index b41f66f89fb..c51572b377d 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
@@ -79,6 +79,11 @@ public interface SourceReader {
/** Called when closing */
void close(JobBaseConfig jobConfig);
+ /**
+ * Stop the reader engine and free its replication-slot connection, but
keep the slot itself.
+ */
+ void release(JobBaseConfig jobConfig);
+
DeserializeResult deserialize(Map<String, String> config, SourceRecord
element)
throws IOException;
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/common/EnvTest.java
similarity index 55%
copy from
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
copy to
fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/common/EnvTest.java
index fd56518643d..685beb29f8a 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/common/EnvTest.java
@@ -15,19 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.job.cdc.request;
+package org.apache.doris.cdcclient.common;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
+import static org.junit.jupiter.api.Assertions.assertNull;
-import java.util.Map;
+import org.junit.jupiter.api.Test;
-@Data
-@EqualsAndHashCode(callSuper = true)
-public class WriteRecordRequest extends JobBaseRecordRequest {
- private long maxInterval;
- private String targetDb;
- private String token;
- private String taskId;
- private Map<String, String> streamLoadProps;
+class EnvTest {
+
+ @Test
+ void getReaderIfPresentReturnsNullForUnknownJob() {
+ // An off-target releaseReader RPC must be a no-op, never create a
reader -> peek returns null.
+ assertNull(Env.getCurrentEnv().getReaderIfPresent("no-such-job-id"));
+ }
+
+ @Test
+ void detachReaderIfOwnerReturnsNullForUnknownJob() {
+ // Stale release for an unknown job (no lock/context) must be a no-op.
+ assertNull(Env.getCurrentEnv().detachReaderIfOwner("no-such-job-id",
"t1"));
+ }
}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/sink/RespContentTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/sink/RespContentTest.java
new file mode 100644
index 00000000000..5765fbc786f
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/sink/RespContentTest.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.sink;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+class RespContentTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @Test
+ void parsesFirstErrorMsgFromFailedLoadResponse() throws Exception {
+ String json =
+ "{\"Status\":\"Fail\","
+ + "\"Message\":\"[DATA_QUALITY_ERROR]Encountered
unqualified data.\","
+ + "\"ErrorURL\":\"https://host/error_log/abc\","
+ + "\"FirstErrorMsg\":\"column(event_qty) values is
null while columns is"
+ + " not nullable.\"}";
+
+ RespContent resp = MAPPER.readValue(json, RespContent.class);
+
+ assertEquals("Fail", resp.getStatus());
+ assertEquals(
+ "column(event_qty) values is null while columns is not
nullable.",
+ resp.getFirstErrorMsg());
+ }
+
+ @Test
+ void firstErrorMsgIsNullWhenAbsent() throws Exception {
+ // Success responses omit FirstErrorMsg; must not blow up
deserialization.
+ RespContent resp =
+ MAPPER.readValue("{\"Status\":\"Success\"}",
RespContent.class);
+ assertNull(resp.getFirstErrorMsg());
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
index fbca1ad41e6..af1ffae2902 100644
---
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
@@ -17,6 +17,9 @@
package org.apache.doris.cdcclient.source.reader;
+import org.apache.doris.cdcclient.source.reader.mysql.MySqlSourceReader;
+import org.apache.doris.cdcclient.source.reader.postgres.PostgresSourceReader;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
import org.apache.doris.job.cdc.split.SnapshotSplit;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -25,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;
+import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
@@ -38,6 +42,15 @@ class AbstractCdcSourceReaderTest {
private static final ObjectMapper MAPPER = new ObjectMapper();
+ @Test
+ void releaseStaysBaseImplSoReplicationSlotIsKept() throws Exception {
+ // release must stay the base impl (close drops the slot, release must
not) so a reschedule keeps the slot.
+ Method pgRelease = PostgresSourceReader.class.getMethod("release",
JobBaseConfig.class);
+ assertEquals(AbstractCdcSourceReader.class,
pgRelease.getDeclaringClass());
+ Method mysqlRelease = MySqlSourceReader.class.getMethod("release",
JobBaseConfig.class);
+ assertEquals(AbstractCdcSourceReader.class,
mysqlRelease.getDeclaringClass());
+ }
+
@Test
void convertBoundsRestoresDateFromString() {
Object[] raw = new Object[] {"2025-01-06"};
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
index a23e5efb0bf..e3307d19d65 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
@@ -102,6 +102,8 @@ suite("test_streaming_mysql_job_errormsg",
"p0,external,mysql,external_docker,ex
log.info("jobFailMsg: " + jobFailMsg)
// stream load error: [DATA_QUALITY_ERROR]too many filtered rows
assert jobFailMsg.get(0).get(0).contains("stream load error")
+ // FirstErrorMsg (first rejected row detail) must be surfaced to the
user, not just an URL
+ assert jobFailMsg.get(0).get(0).contains("first_error_msg")
// add max_filter_ratio to 1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]