This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4a9d7d1f9a9 [improve](streaming-job) add from-to cdc WAL-search
timeout and stale-reader release (#64013)
4a9d7d1f9a9 is described below
commit 4a9d7d1f9a9cd09edc2ff3cc36e49657a4fa52d8
Author: wudi <[email protected]>
AuthorDate: Thu Jun 4 15:53:58 2026 +0800
[improve](streaming-job) add from-to cdc WAL-search timeout and
stale-reader release (#64013)
## Proposed changes
Three reliability/observability fixes for from-to (at-least-once) CDC
streaming tasks.
1. **Startup timeout.** A from-to binlog task whose upstream is idle
could block
indefinitely in the replication startup/locate phase (no first message
arrives,
so the poll loop never times out). This adds a setup-phase timeout —
half of the
FE task timeout, passed down via `WriteRecordRequest.taskTimeoutMs` — so
the task
exits and commits the current offset gracefully instead of hanging.
Snapshot
splits are explicitly excluded so an incomplete watermark is never
committed.
2. **Release a stale reader on failure, guarded by task ownership.** On
task
`onFail`/`cancel`, FE makes a best-effort request
(`/api/releaseReader/{taskId}`)
asking the previous backend to stop its reader while keeping the
replication slot,
so a reschedule to another backend does not leave two readers competing
for the
same slot. The reader cache is keyed by job id and reused across tasks,
so the
release carries the failing task id and the backend releases only if
that task
still owns the reader; a stale/late RPC becomes a no-op and cannot
interrupt a
replacement task that reused the same reader. The RPC is fire-and-forget
so it
never blocks while the job lock is held.
3. **Surface the first rejected-row detail in the task error.** When a
stream load
fails with a data-quality error, the cdc_client now parses the
`FirstErrorMsg`
field from the response and appends it to the task error reported to FE,
so the
job `errorMsg` shows the actual offending row instead of only an
`ErrorURL`.
Known limitation: the release is best-effort, so a reschedule may
briefly observe
"replication slot is active"; this self-heals via task retry or the
source-side
sender timeout.
## Further comments
Scoped to the from-to streaming path; snapshot and TVF paths are
unaffected.
---
.../doris/job/cdc/request/WriteRecordRequest.java | 1 +
.../insert/streaming/StreamingMultiTblTask.java | 54 ++++++++++++++++--
.../org/apache/doris/cdcclient/common/Env.java | 65 ++++++++++++++++++++++
.../cdcclient/controller/ClientController.java | 21 +++++++
.../cdcclient/service/PipelineCoordinator.java | 23 +++++++-
.../doris/cdcclient/sink/DorisBatchStreamLoad.java | 15 +++--
.../apache/doris/cdcclient/sink/RespContent.java | 7 +++
.../source/reader/AbstractCdcSourceReader.java | 8 +++
.../cdcclient/source/reader/SourceReader.java | 5 ++
.../org/apache/doris/cdcclient/common/EnvTest.java | 28 ++++++----
.../doris/cdcclient/sink/RespContentTest.java | 55 ++++++++++++++++++
.../source/reader/AbstractCdcSourceReaderTest.java | 13 +++++
.../cdc/test_streaming_mysql_job_errormsg.groovy | 2 +
13 files changed, 274 insertions(+), 23 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
index fd56518643d..511e4fcea74 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
@@ -26,6 +26,7 @@ import java.util.Map;
@EqualsAndHashCode(callSuper = true)
public class WriteRecordRequest extends JobBaseRecordRequest {
private long maxInterval;
+ private long taskTimeoutMs;
private String targetDb;
private String token;
private String taskId;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index c257613749d..9077ad01a82 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -26,6 +26,7 @@ import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.job.base.Job;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.CommitOffsetRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
import org.apache.doris.job.cdc.split.BinlogSplit;
import org.apache.doris.job.cdc.split.SnapshotSplit;
@@ -208,6 +209,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
String feAddr = Env.getCurrentEnv().getMasterHost() + ":" +
Env.getCurrentEnv().getMasterHttpPort();
request.setFrontendAddress(feAddr);
request.setMaxInterval(jobProperties.getMaxIntervalSecond());
+ request.setTaskTimeoutMs(getTaskTimeoutMs());
if (offsetProvider instanceof JdbcSourceOffsetProvider) {
String schemas = ((JdbcSourceOffsetProvider)
offsetProvider).getTableSchemas();
if (schemas != null) {
@@ -306,18 +308,57 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
@Override
protected void onFail(String errMsg) throws JobException {
+ // Release this task's reader before reschedule so it stops competing
for the shared slot.
+ releaseRemoteReader();
super.onFail(errMsg);
}
@Override
public void cancel(boolean needWaitCancelComplete) {
- // No manual cancellation is required; the task ID will be checked for
consistency in the beforeCommit function.
+ // No release here: DROP/STOP/PAUSE clean up via /api/close; releasing
would orphan the engine.
super.cancel(needWaitCancelComplete);
}
@Override
public void closeOrReleaseResources() {
- // no need
+ // No-op: reader is shared across tasks; release on reschedule is done
in onFail().
+ }
+
+ /**
+ * Best-effort, onFail reschedule only: stop this job's reader on {@link
#runningBackendId} so a
+ * reschedule to another backend never leaves two readers competing for
the same source (e.g. one
+ * PG replication slot, which is kept, not dropped). Failures are
swallowed and must not block
+ * rescheduling.
+ */
+ public void releaseRemoteReader() {
+ if (runningBackendId <= 0) {
+ return;
+ }
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(runningBackendId);
+ if (backend == null) {
+ log.info("Skip releasing remote reader: backend {} not found, job
{} task {}",
+ runningBackendId, getJobId(), getTaskId());
+ return;
+ }
+ try {
+ JobBaseConfig releaseParams = new JobBaseConfig(
+ String.valueOf(getJobId()), dataSourceType.name(),
sourceProperties, getFrontendAddress());
+ InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
+ .setApi("/api/releaseReader/" + getTaskId())
+ .setParams(new Gson().toJson(releaseParams)).build();
+ TNetworkAddress address = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
+ // Fire-and-forget: this runs under the job write lock, so never
block on the result.
+ BackendServiceProxy.getInstance()
+ .requestCdcClient(address, request,
Config.streaming_cdc_light_rpc_timeout_sec);
+ log.info("Sent release reader request to backend {}:{} for job {}
task {}",
+ backend.getHost(), backend.getBrpcPort(), getJobId(),
getTaskId());
+ } catch (Exception ex) {
+ log.warn("Release remote reader request failed for job {} task {}:
", getJobId(), getTaskId(), ex);
+ }
+ }
+
+ private String getFrontendAddress() {
+ return Env.getCurrentEnv().getMasterHost() + ":" +
Env.getCurrentEnv().getMasterHttpPort();
}
public boolean isTimeout() {
@@ -325,9 +366,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
// It's still pending, waiting for scheduling.
return false;
}
- // Read multiplier live so config changes affect already-running tasks.
- long timeoutMs = Config.streaming_task_timeout_multiplier
- * jobProperties.getMaxIntervalSecond() * 1000L;
+ long timeoutMs = getTaskTimeoutMs();
long elapsed = System.currentTimeMillis() - startTimeMs;
if (elapsed > timeoutMs) {
log.info("Task {} timeout detected: elapsed={}ms, timeoutMs={}ms",
taskId, elapsed, timeoutMs);
@@ -336,6 +375,11 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
return false;
}
+ // Read multiplier live so config changes affect already-running tasks.
+ private long getTaskTimeoutMs() {
+ return Config.streaming_task_timeout_multiplier *
jobProperties.getMaxIntervalSecond() * 1000L;
+ }
+
/**
* When a task encounters a write error, it will time out.
* The job needs to obtain the reason for the timeout,
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
index 79d5c65cf57..28da598053b 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
@@ -73,6 +73,70 @@ public class Env {
return manager.getOrCreateReader(jobConfig.getJobId(), ds,
jobConfig.getConfig());
}
+ /** Return the reader only if already created, else null (never creates
one). */
+ public SourceReader getReaderIfPresent(String jobId) {
+ JobContext context = jobContexts.get(jobId);
+ return context == null ? null : context.reader;
+ }
+
+ /**
+ * Get-or-create this job's reader and claim ownership for {@code taskId}
atomically under the
+ * per-job lock, so a concurrent stale release cannot stop a reader this
task is about to use.
+ */
+ public SourceReader getReaderAndClaim(JobBaseConfig jobConfig, String
taskId) {
+ if (jobConfig.getFrontendAddress() != null &&
!jobConfig.getFrontendAddress().isEmpty()) {
+ this.feMasterAddress = jobConfig.getFrontendAddress();
+ }
+ DataSource ds = resolveDataSource(jobConfig.getDataSource());
+ String jobId = jobConfig.getJobId();
+ Lock lock = jobLocks.computeIfAbsent(jobId, k -> new ReentrantLock());
+ lock.lock();
+ try {
+ JobContext context = jobContexts.get(jobId);
+ if (context == null) {
+ LOG.info("Creating new reader for job {}, dataSource {}",
jobId, ds);
+ context = new JobContext(jobId, ds, jobConfig.getConfig());
+ context.initializeReader();
+ jobContexts.put(jobId, context);
+ }
+ context.ownerTaskId = taskId;
+ return context.getReader(ds);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * If {@code taskId} still owns the reader, remove the context under the
per-job lock and return
+ * the reader to release; else null (stale release -> no-op). Removing
under the lock guarantees
+ * a racing {@link #getReaderAndClaim} either sees the new owner (no-op)
or rebuilds a fresh
+ * one.
+ */
+ public SourceReader detachReaderIfOwner(String jobId, String taskId) {
+ Lock lock = jobLocks.get(jobId);
+ if (lock == null) {
+ return null;
+ }
+ lock.lock();
+ try {
+ JobContext context = jobContexts.get(jobId);
+ if (context == null || !Objects.equals(context.ownerTaskId,
taskId)) {
+ if (context != null) {
+ LOG.info(
+ "Stale release for job {} task {} (owner {}),
skip",
+ jobId,
+ taskId,
+ context.ownerTaskId);
+ }
+ return null;
+ }
+ jobContexts.remove(jobId);
+ return context.reader;
+ } finally {
+ lock.unlock();
+ }
+ }
+
private DataSource resolveDataSource(String source) {
if (source == null || source.trim().isEmpty()) {
throw new IllegalArgumentException("Missing dataSource");
@@ -131,6 +195,7 @@ public class Env {
private static final class JobContext {
private final String jobId;
private volatile SourceReader reader;
+ private volatile String ownerTaskId;
private volatile Map<String, String> config;
private volatile DataSource dataSource;
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 0dd3367abe4..22509a55e98 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -138,6 +138,27 @@ public class ClientController {
return RestResponse.success(true);
}
+ /** Release a job's reader on this backend: stop engine, keep the
replication slot. */
+ @RequestMapping(path = "/api/releaseReader/{taskId}", method =
RequestMethod.POST)
+ public Object releaseReader(
+ @PathVariable("taskId") String taskId, @RequestBody JobBaseConfig
jobConfig) {
+ LOG.info("Releasing reader (keep slot) for job {} task {}",
jobConfig.getJobId(), taskId);
+ Env env = Env.getCurrentEnv();
+ // Only the owning task may release; detach removes the context under
the per-job lock so a
+ // racing claim rebuilds a fresh reader, and a stale RPC is a no-op.
+ SourceReader reader = env.detachReaderIfOwner(jobConfig.getJobId(),
taskId);
+ if (reader == null) {
+ LOG.info(
+ "No owned reader for job {} task {}, skip release",
+ jobConfig.getJobId(),
+ taskId);
+ return RestResponse.success(true);
+ }
+ // Upstream-only: stop engine, keep slot. Loader is job-scoped,
cleaned up by /api/close.
+ reader.release(jobConfig);
+ return RestResponse.success(true);
+ }
+
/** get task fail reason */
@RequestMapping(path = "/api/getFailReason/{taskId}", method =
RequestMethod.POST)
public Object getFailReason(@PathVariable("taskId") String taskId) {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 18bd9fcc211..9505290d55e 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -442,7 +442,11 @@ public class PipelineCoordinator {
Map<String, String> targetTableMappings =
ConfigUtil.parseAllTargetTableMappings(writeRecordRequest.getConfig());
- SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
+ // Get-or-create the reader and claim ownership atomically, so a
concurrent stale
+ // releaseReader RPC cannot stop the reader this task is about to use.
+ SourceReader sourceReader =
+ Env.getCurrentEnv()
+ .getReaderAndClaim(writeRecordRequest,
writeRecordRequest.getTaskId());
DorisBatchStreamLoad batchStreamLoad = null;
long scannedRows = 0L;
int heartbeatCount = 0;
@@ -458,6 +462,8 @@ public class PipelineCoordinator {
long startTime = System.currentTimeMillis();
long streamingStartTime = -1;
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
+ // Half the FE task timeout; exit setup phase before FE
force-kills. 0 disables.
+ long searchTimeoutMs = writeRecordRequest.getTaskTimeoutMs() / 2;
boolean shouldStop = false;
boolean lastMessageIsHeartbeat = false;
@@ -475,6 +481,21 @@ public class PipelineCoordinator {
if (!recordIterator.hasNext()) {
Thread.sleep(100);
+ // Stream-split setup stuck (WAL search / idle): bail out;
snapshot has its own
+ // completion logic.
+ if (!isSnapshotSplit
+ && streamingStartTime < 0
+ && searchTimeoutMs > 0
+ && System.currentTimeMillis() - startTime >
searchTimeoutMs) {
+ LOG.warn(
+ "Streaming not started within {} ms for
jobId={} taskId={}, "
+ + "stopping to commit offset",
+ searchTimeoutMs,
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId());
+ break;
+ }
+
// Check if should stop
long elapsedTime =
streamingStartTime > 0
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index d1aa7ccb38f..f853f931646 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -441,11 +441,16 @@ public class DorisBatchStreamLoad implements Serializable
{
loadResult);
throw new StreamLoadException(errMsg);
} else {
- errMsg =
- String.format(
- "stream load error: %s,
see more in %s",
- respContent.getMessage(),
- respContent.getErrorURL());
+ // Carry FirstErrorMsg (the first rejected
row detail) so the
+ // task error surfaced to FE is
actionable, not just an URL.
+ StringBuilder msg = new
StringBuilder("stream load error: ");
+ msg.append(respContent.getMessage());
+ if
(StringUtils.isNotBlank(respContent.getFirstErrorMsg())) {
+ msg.append(", first_error_msg: ")
+
.append(respContent.getFirstErrorMsg());
+ }
+ msg.append(", see more in
").append(respContent.getErrorURL());
+ errMsg = msg.toString();
}
throw new StreamLoadException(errMsg);
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/RespContent.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/RespContent.java
index 35327aa4553..b5de06f9621 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/RespContent.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/RespContent.java
@@ -81,6 +81,9 @@ public class RespContent {
@JsonProperty(value = "ErrorURL")
private String errorURL;
+ @JsonProperty(value = "FirstErrorMsg")
+ private String firstErrorMsg;
+
public Long getTxnId() {
return txnId;
}
@@ -167,4 +170,8 @@ public class RespContent {
public String getErrorURL() {
return errorURL;
}
+
+ public String getFirstErrorMsg() {
+ return firstErrorMsg;
+ }
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
index 438f3ef6556..6c46482dc6d 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -68,6 +68,14 @@ public abstract class AbstractCdcSourceReader implements
SourceReader {
private final Map<String, Class<?>> splitKeyClassCache = new
ConcurrentHashMap<>();
+ @Override
+ public synchronized void release(JobBaseConfig jobConfig) {
+ // Stop the engine but keep source-side state (e.g. the PG replication
slot) for another
+ // backend to take over.
+ LOG.info("Release source reader for job {}", jobConfig.getJobId());
+ finishSplitRecords();
+ }
+
protected abstract Class<?> probeSplitKeyClass(
TableId tableId, Column splitColumn, JobBaseConfig jobConfig);
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
index b41f66f89fb..c51572b377d 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
@@ -79,6 +79,11 @@ public interface SourceReader {
/** Called when closing */
void close(JobBaseConfig jobConfig);
+ /**
+ * Stop the reader engine and free its replication-slot connection, but
keep the slot itself.
+ */
+ void release(JobBaseConfig jobConfig);
+
DeserializeResult deserialize(Map<String, String> config, SourceRecord
element)
throws IOException;
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/common/EnvTest.java
similarity index 55%
copy from
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
copy to
fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/common/EnvTest.java
index fd56518643d..685beb29f8a 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/common/EnvTest.java
@@ -15,19 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.job.cdc.request;
+package org.apache.doris.cdcclient.common;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
+import static org.junit.jupiter.api.Assertions.assertNull;
-import java.util.Map;
+import org.junit.jupiter.api.Test;
-@Data
-@EqualsAndHashCode(callSuper = true)
-public class WriteRecordRequest extends JobBaseRecordRequest {
- private long maxInterval;
- private String targetDb;
- private String token;
- private String taskId;
- private Map<String, String> streamLoadProps;
+class EnvTest {
+
+ @Test
+ void getReaderIfPresentReturnsNullForUnknownJob() {
+ // An off-target releaseReader RPC must be a no-op, never create a
reader -> peek returns null.
+ assertNull(Env.getCurrentEnv().getReaderIfPresent("no-such-job-id"));
+ }
+
+ @Test
+ void detachReaderIfOwnerReturnsNullForUnknownJob() {
+ // Stale release for an unknown job (no lock/context) must be a no-op.
+ assertNull(Env.getCurrentEnv().detachReaderIfOwner("no-such-job-id",
"t1"));
+ }
}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/sink/RespContentTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/sink/RespContentTest.java
new file mode 100644
index 00000000000..5765fbc786f
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/sink/RespContentTest.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.sink;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+class RespContentTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @Test
+ void parsesFirstErrorMsgFromFailedLoadResponse() throws Exception {
+ String json =
+ "{\"Status\":\"Fail\","
+ + "\"Message\":\"[DATA_QUALITY_ERROR]Encountered
unqualified data.\","
+ + "\"ErrorURL\":\"https://host/error_log/abc\","
+ + "\"FirstErrorMsg\":\"column(event_qty) values is
null while columns is"
+ + " not nullable.\"}";
+
+ RespContent resp = MAPPER.readValue(json, RespContent.class);
+
+ assertEquals("Fail", resp.getStatus());
+ assertEquals(
+ "column(event_qty) values is null while columns is not
nullable.",
+ resp.getFirstErrorMsg());
+ }
+
+ @Test
+ void firstErrorMsgIsNullWhenAbsent() throws Exception {
+ // Success responses omit FirstErrorMsg; must not blow up
deserialization.
+ RespContent resp =
+ MAPPER.readValue("{\"Status\":\"Success\"}",
RespContent.class);
+ assertNull(resp.getFirstErrorMsg());
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
index fbca1ad41e6..af1ffae2902 100644
---
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
@@ -17,6 +17,9 @@
package org.apache.doris.cdcclient.source.reader;
+import org.apache.doris.cdcclient.source.reader.mysql.MySqlSourceReader;
+import org.apache.doris.cdcclient.source.reader.postgres.PostgresSourceReader;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
import org.apache.doris.job.cdc.split.SnapshotSplit;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -25,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;
+import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
@@ -38,6 +42,15 @@ class AbstractCdcSourceReaderTest {
private static final ObjectMapper MAPPER = new ObjectMapper();
+ @Test
+ void releaseStaysBaseImplSoReplicationSlotIsKept() throws Exception {
+ // release must stay the base impl (close drops the slot, release must
not) so a reschedule keeps the slot.
+ Method pgRelease = PostgresSourceReader.class.getMethod("release",
JobBaseConfig.class);
+ assertEquals(AbstractCdcSourceReader.class,
pgRelease.getDeclaringClass());
+ Method mysqlRelease = MySqlSourceReader.class.getMethod("release",
JobBaseConfig.class);
+ assertEquals(AbstractCdcSourceReader.class,
mysqlRelease.getDeclaringClass());
+ }
+
@Test
void convertBoundsRestoresDateFromString() {
Object[] raw = new Object[] {"2025-01-06"};
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
index a23e5efb0bf..e3307d19d65 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
@@ -102,6 +102,8 @@ suite("test_streaming_mysql_job_errormsg",
"p0,external,mysql,external_docker,ex
log.info("jobFailMsg: " + jobFailMsg)
// stream load error: [DATA_QUALITY_ERROR]too many filtered rows
assert jobFailMsg.get(0).get(0).contains("stream load error")
+ // FirstErrorMsg (first rejected row detail) must be surfaced to the
user, not just an URL
+ assert jobFailMsg.get(0).get(0).contains("first_error_msg")
// add max_filter_ratio to 1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]