This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6e27f117471 [improve](streaming-job) avoid potential OOM when reading
large snapshot splits (#63833)
6e27f117471 is described below
commit 6e27f117471f481e13cebabe0454dddc60e5245c
Author: wudi <[email protected]>
AuthorDate: Fri May 29 11:39:11 2026 +0800
[improve](streaming-job) avoid potential OOM when reading large snapshot
splits (#63833)
## Summary
- Default-skip flink-cdc's in-snapshot backfill on the from-to path so
large splits no longer accumulate the entire chunk + backfill stream in
the fetcher's outputBuffer; from-to is at-least-once and tolerates the
duplicates this introduces. TVF (job-driven and standalone) keeps the
standard `false` default for exactly-once via per-task offset commit.
- Expose `skip_snapshot_backfill` as a user-facing property with strict
`true`/`false` validation on both from-to (CREATE JOB) and TVF (SELECT
FROM cdc_stream(...)) entry points.
- Fix snapshot completion under `pollWithoutBuffer`: a split is now
marked complete only after its high-watermark event has been consumed
(`splitState.getHighWatermark() != null`), not on the first non-empty
fetcher batch. Without this, enabling the new default truncates any
split larger than debezium's `max.batch.size` and yields an NPE on
offset extraction.
- Read `streaming_task_timeout_multiplier` live in
`StreamingMultiTblTask.isTimeout()` so `admin set frontend config`
affects already-running tasks, matching the `@ConfField(mutable=true)`
contract.
---
.../apache/doris/job/cdc/DataSourceConfigKeys.java | 1 +
.../streaming/DataSourceConfigValidator.java | 10 ++
.../insert/streaming/StreamingInsertJob.java | 2 +
.../insert/streaming/StreamingMultiTblTask.java | 5 +-
.../CdcStreamTableValuedFunction.java | 12 ++
.../cdcclient/service/PipelineCoordinator.java | 14 +-
.../doris/cdcclient/sink/DorisBatchStreamLoad.java | 2 +-
.../source/reader/JdbcIncrementalSourceReader.java | 40 +++++-
.../cdcclient/source/reader/SourceReader.java | 5 +
.../source/reader/mysql/MySqlSourceReader.java | 38 ++++-
.../reader/postgres/PostgresSourceReader.java | 4 +
...t_streaming_mysql_job_snapshot_fat_split.groovy | 153 +++++++++++++++++++++
...treaming_postgres_job_snapshot_fat_split.groovy | 141 +++++++++++++++++++
13 files changed, 418 insertions(+), 9 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 3708e8dc6a3..75617e4a907 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -38,6 +38,7 @@ public class DataSourceConfigKeys {
public static final String SNAPSHOT_SPLIT_KEY = "snapshot_split_key";
public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
+ public static final String SKIP_SNAPSHOT_BACKFILL =
"skip_snapshot_backfill";
// MySQL CDC client identity. Single value "5400" or range "5400-5408".
public static final String SERVER_ID = "server_id";
public static final String SSL_MODE = "ssl_mode";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index 4ca1e605ef5..1c1dc10c0d5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -50,6 +50,7 @@ public class DataSourceConfigValidator {
DataSourceConfigKeys.EXCLUDE_TABLES,
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+ DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL,
DataSourceConfigKeys.SSL_MODE,
DataSourceConfigKeys.SSL_ROOTCERT,
DataSourceConfigKeys.SLOT_NAME,
@@ -208,12 +209,21 @@ public class DataSourceConfigValidator {
|| key.equals(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)) {
return isPositiveInt(value);
}
+ if (key.equals(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)) {
+ return isValidBoolean(value);
+ }
if (key.equals(DataSourceConfigKeys.SERVER_ID)) {
return parseServerIdRange(value) != null;
}
return true;
}
+ // Strict boolean: only "true"/"false" (case-insensitive);
Boolean.parseBoolean would
+ // silently coerce typos like "yes" to false.
+ public static boolean isValidBoolean(String value) {
+ return "true".equalsIgnoreCase(value) ||
"false".equalsIgnoreCase(value);
+ }
+
public static boolean isPositiveInt(String value) {
if (value == null) {
return false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 55c761a5d42..09090ba3ad1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -276,6 +276,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
if (!sourceProperties.containsKey(DataSourceConfigKeys.OFFSET)) {
sourceProperties.put(DataSourceConfigKeys.OFFSET,
DataSourceConfigKeys.OFFSET_LATEST);
}
+ // from-to is at-least-once; default-skip in-snapshot backfill.
+
sourceProperties.putIfAbsent(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL,
"true");
}
private List<String> createTableIfNotExists() throws Exception {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 1d3fdf66f04..c257613749d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -82,7 +82,6 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
private long loadBytes = 0L;
private long filteredRows = 0L;
private long loadedRows = 0L;
- private long timeoutMs;
private long runningBackendId;
public StreamingMultiTblTask(Long jobId,
@@ -103,7 +102,6 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
this.jobProperties = jobProperties;
this.targetDb = targetDb;
this.cloudCluster = cloudCluster;
- this.timeoutMs = Config.streaming_task_timeout_multiplier *
jobProperties.getMaxIntervalSecond() * 1000L;
}
@Override
@@ -327,6 +325,9 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
// It's still pending, waiting for scheduling.
return false;
}
+ // Read multiplier live so config changes affect already-running tasks.
+ long timeoutMs = Config.streaming_task_timeout_multiplier
+ * jobProperties.getMaxIntervalSecond() * 1000L;
long elapsed = System.currentTimeMillis() - startTimeMs;
if (elapsed > timeoutMs) {
log.info("Task {} timeout detected: elapsed={}ms, timeoutMs={}ms",
taskId, elapsed, timeoutMs);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
index aa60924601c..ae3c815789f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
@@ -156,6 +156,7 @@ public class CdcStreamTableValuedFunction extends
ExternalFileTableValuedFunctio
}
validatePositiveIntIfPresent(properties,
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE);
validatePositiveIntIfPresent(properties,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM);
+ validateBooleanIfPresent(properties,
DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL);
// TVF entrypoint shares server_id checks with the from-to path's
validateSource.
try {
DataSourceConfigValidator.validateServerIdConfig(properties);
@@ -186,6 +187,17 @@ public class CdcStreamTableValuedFunction extends
ExternalFileTableValuedFunctio
}
}
+ private static void validateBooleanIfPresent(Map<String, String>
properties, String key)
+ throws AnalysisException {
+ String value = properties.get(key);
+ if (value == null) {
+ return;
+ }
+ if (!DataSourceConfigValidator.isValidBoolean(value)) {
+ throw new AnalysisException("Invalid value for key '" + key + "':
" + value);
+ }
+ }
+
private void generateFileStatus() {
this.fileStatuses.clear();
this.fileStatuses.add(new TBrokerFileStatus(URI, false,
Integer.MAX_VALUE, false));
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 9b2dd5d357f..18bd9fcc211 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -168,6 +168,7 @@ public class PipelineCoordinator {
long elapsedTime = System.currentTimeMillis() - startTime;
boolean timeoutReached = elapsedTime >
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
if (shouldStop(
+ sourceReader,
isSnapshotSplit,
hasReceivedData,
lastMessageIsHeartbeat,
@@ -315,6 +316,7 @@ public class PipelineCoordinator {
boolean timeoutReached = elapsedTime >
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
if (shouldStop(
+ sourceReader,
isSnapshotSplit,
hasReceivedData,
lastMessageIsHeartbeat,
@@ -484,6 +486,7 @@ public class PipelineCoordinator {
&& elapsedTime >= maxIntervalMillis;
if (shouldStop(
+ sourceReader,
isSnapshotSplit,
scannedRows > 0,
lastMessageIsHeartbeat,
@@ -615,6 +618,7 @@ public class PipelineCoordinator {
* @return true if should stop, false if should continue
*/
private boolean shouldStop(
+ SourceReader sourceReader,
boolean isSnapshotSplit,
boolean hasData,
boolean lastMessageIsHeartbeat,
@@ -622,11 +626,13 @@ public class PipelineCoordinator {
long maxIntervalMillis,
boolean timeoutReached) {
- // 1. Snapshot split with data: if no more data in queue, stop
immediately (no need to wait
- // for timeout)
- // snapshot split will be written to the debezium queue all at once.
- // multiple snapshot splits are handled in the source reader.
+ // Snapshot split: wait until every split has received its
high-watermark event;
+ // an empty poll alone is not a finish signal under pollWithoutBuffer
where the
+ // fetcher returns one ChangeEventQueue batch at a time.
if (isSnapshotSplit) {
+ if (!sourceReader.isSnapshotFinished()) {
+ return false;
+ }
LOG.info(
"Snapshot split finished, no more data available. Total
elapsed: {} ms",
elapsedTime);
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index 4583b049b81..d1aa7ccb38f 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -417,7 +417,7 @@ public class DorisBatchStreamLoad implements Serializable {
OBJECT_MAPPER.readValue(loadResult,
RespContent.class);
if
(DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
long cacheByteBeforeFlush =
-
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
+
currentCacheBytes.getAndAdd(-buffer.getBufferSizeBytes());
LOG.info(
"load success, cacheBeforeFlushBytes:
{}, currentCacheBytes : {}",
cacheByteBeforeFlush,
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index ddbc71c7fd7..f0987eb97ee 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -468,6 +468,9 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
return Collections.emptyIterator();
}
+ // A split is finished only after its high-watermark event has been
consumed.
+ refreshCompletedSplits();
+
if (completedSplitIds.size() >= snapshotReaderContexts.size()) {
LOG.info("All {} snapshot splits have been completed",
snapshotReaderContexts.size());
return Collections.emptyIterator();
@@ -515,6 +518,11 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
Fetcher<SourceRecords, SourceSplitBase>,
SnapshotSplitState>
context = snapshotReaderContexts.get(index);
+ // Skip splits already drained to high-watermark; otherwise their
poll futures spin
+ // returning null and starve siblings.
+ if (completedSplitIds.contains(context.getSplit().splitId())) {
+ continue;
+ }
CompletableFuture<PollResult> future =
CompletableFuture.supplyAsync(
@@ -569,11 +577,12 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
snapshot.remove(future);
PollResult result = future.get();
if (result != null) {
+ // Split completion is determined later by
splitState.getHighWatermark()
+ // != null, not by receiving a non-empty batch.
LOG.info(
"Got result from reader {}, {} futures
remaining",
result.context.getSplit().splitId(),
snapshot.size());
-
completedSplitIds.add(result.context.getSplit().splitId());
return result;
}
// If result is null (no data), continue checking other
futures
@@ -839,6 +848,35 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
return offsetRes;
}
+ @Override
+ public boolean isSnapshotFinished() {
+ if (snapshotReaderContexts.isEmpty()) {
+ return true;
+ }
+ for (SnapshotReaderContext<
+
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit,
+ Fetcher<SourceRecords, SourceSplitBase>,
+ SnapshotSplitState>
+ context : snapshotReaderContexts) {
+ if (context.getSplitState().getHighWatermark() == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void refreshCompletedSplits() {
+ for (SnapshotReaderContext<
+
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit,
+ Fetcher<SourceRecords, SourceSplitBase>,
+ SnapshotSplitState>
+ context : snapshotReaderContexts) {
+ if (context.getSplitState().getHighWatermark() != null) {
+ completedSplitIds.add(context.getSplit().splitId());
+ }
+ }
+ }
+
@Override
public Map<String, String> extractBinlogStateOffset(Object splitState) {
Preconditions.checkNotNull(splitState, "splitState is null");
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
index 95eeb052681..b41f66f89fb 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
@@ -98,4 +98,9 @@ public interface SourceReader {
* indicate how far the source TX log can be discarded.
*/
default void commitSourceOffset(String jobId, SourceSplit sourceSplit) {}
+
+ /** Whether all snapshot splits have received their high-watermark event.
*/
+ default boolean isSnapshotFinished() {
+ return true;
+ }
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index e5115d1c51a..1bc7db23fd4 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -489,6 +489,9 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
return Collections.emptyIterator();
}
+ // A split is finished only after its high-watermark event has been
consumed.
+ refreshCompletedSplits();
+
if (completedSplitIds.size() >= snapshotReaderContexts.size()) {
LOG.info("All {} snapshot splits have been completed",
snapshotReaderContexts.size());
return Collections.emptyIterator();
@@ -533,6 +536,11 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
final int index = i;
SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader,
MySqlSnapshotSplitState>
context = snapshotReaderContexts.get(index);
+ // Skip splits already drained to high-watermark; otherwise their
poll futures spin
+ // returning null and starve siblings.
+ if (completedSplitIds.contains(context.getSplit().splitId())) {
+ continue;
+ }
CompletableFuture<PollResult> future =
CompletableFuture.supplyAsync(
@@ -587,11 +595,12 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
snapshot.remove(future);
PollResult result = future.get();
if (result != null) {
+ // Split completion is determined later by
splitState.getHighWatermark()
+ // != null, not by receiving a non-empty batch.
LOG.info(
"Got result from reader {}, {} futures
remaining",
result.context.getSplit().splitId(),
snapshot.size());
-
completedSplitIds.add(result.context.getSplit().splitId());
return result;
}
// If result is null (no data), continue checking other
futures
@@ -992,6 +1001,10 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
objectPath,
cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY));
}
+ // FE injects "true" on TVF path; from-to leaves it absent → default
false.
+ configFactory.skipSnapshotBackfill(
+
Boolean.parseBoolean(cdcConfig.get(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)));
+
return configFactory.createConfig(subtaskId);
}
@@ -1013,6 +1026,29 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
return new HashMap<>(highWatermark.getOffset());
}
+ @Override
+ public boolean isSnapshotFinished() {
+ if (snapshotReaderContexts.isEmpty()) {
+ return true;
+ }
+ for (SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader,
MySqlSnapshotSplitState>
+ context : snapshotReaderContexts) {
+ if (context.getSplitState().getHighWatermark() == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void refreshCompletedSplits() {
+ for (SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader,
MySqlSnapshotSplitState>
+ context : snapshotReaderContexts) {
+ if (context.getSplitState().getHighWatermark() != null) {
+ completedSplitIds.add(context.getSplit().splitId());
+ }
+ }
+ }
+
@Override
public Map<String, String> extractBinlogStateOffset(Object splitState) {
Preconditions.checkNotNull(splitState, "splitState is null");
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 8bf53a1eb97..2e09e48957a 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -279,6 +279,10 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
// support scan partition table
configFactory.setIncludePartitionedTables(true);
+ // FE injects "true" on TVF path; from-to leaves it absent → default
false.
+ configFactory.skipSnapshotBackfill(
+
Boolean.parseBoolean(cdcConfig.get(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)));
+
// subtaskId use pg create slot in snapshot phase, slotname is
slot_name_subtaskId
return configFactory.create(subtaskId);
}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_fat_split.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_fat_split.groovy
new file mode 100644
index 00000000000..c941a5a0afc
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_fat_split.groovy
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Regression for the snapshot completion bug: a single split whose row count
exceeds
+// debezium's default max.batch.size=2048 must still be drained completely.
The fix
+// keys completion off the high-watermark event instead of the first non-empty
batch.
+// 2100 rows / split_size=3000 -> one snapshot split that the fetcher needs at
least
+// two batches to drain (2048 + 52 + hw event).
+suite("test_streaming_mysql_job_snapshot_fat_split",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_snapshot_fat_split_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_fat_split_mysql"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // ===== Prepare MySQL side: 2100 rows so a single split spans > 1
fetcher batch =====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """CREATE TABLE ${mysqlDb}.${table1} (
+ `id` bigint NOT NULL,
+ `payload` varchar(32),
+ `version` int,
+ PRIMARY KEY (`id`)
+ ) ENGINE=InnoDB"""
+
+ // Bulk insert in 500-row chunks to stay under MySQL's default
max_allowed_packet.
+ int total = 2100
+ int chunk = 500
+ int sent = 0
+ while (sent < total) {
+ int end = Math.min(sent + chunk, total)
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO ${mysqlDb}.${table1} (id, payload,
version) VALUES ")
+ for (int i = sent + 1; i <= end; i++) {
+ if (i > sent + 1) sb.append(", ")
+ sb.append("(${i}, 'snap', 0)")
+ }
+ sql sb.toString()
+ sent = end
+ }
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "snapshot_split_size" = "3000"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 2100
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ def distinctCount = sql """SELECT COUNT(DISTINCT id) FROM
${currentDb}.${table1}"""
+ assert distinctCount.get(0).get(0) == 2100
+ def boundary = sql """SELECT MIN(id), MAX(id) FROM
${currentDb}.${table1}"""
+ assert boundary.get(0).get(0) == 1
+ assert boundary.get(0).get(1) == 2100
+ // Specifically assert rows past the first batch (id > 2048) are
present.
+ def tail = sql """SELECT COUNT(1) FROM ${currentDb}.${table1} WHERE id
BETWEEN 2049 AND 2100"""
+ assert tail.get(0).get(0) == 52
+
+ // ===== Incremental phase: verify post-snapshot DML still flows =====
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """INSERT INTO ${mysqlDb}.${table1} (id, payload, version)
VALUES (3000, 'incr_ins', 1)"""
+ sql """UPDATE ${mysqlDb}.${table1} SET version=99 WHERE id=2100"""
+ sql """DELETE FROM ${mysqlDb}.${table1} WHERE id=1"""
+ }
+
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd = sql """select version from
${currentDb}.${table1} where id=2100"""
+ def ins = sql """select count(1) from
${currentDb}.${table1} where id=3000"""
+ def del = sql """select count(1) from
${currentDb}.${table1} where id=1"""
+ def v = upd.size() == 0 ? null : upd.get(0).get(0)
+ log.info("incr cnt=${cnt} id2100.version=${v}
id3000.exists=${ins} id1.exists=${del}")
+ cnt.get(0).get(0) == 2100 &&
+ v != null && v.toString() == '99' &&
+ ins.get(0).get(0) == 1 &&
+ del.get(0).get(0) == 0
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_fat_split.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_fat_split.groovy
new file mode 100644
index 00000000000..117e4495453
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_fat_split.groovy
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Regression for the snapshot completion bug: a single split whose row count
exceeds
+// debezium's default max.batch.size=2048 must still be drained completely.
The fix
+// keys completion off the high-watermark event instead of the first non-empty
batch.
+// 2100 rows / split_size=3000 -> one snapshot split that the fetcher needs at
least
+// two batches to drain (2048 + 52 + hw event).
+suite("test_streaming_postgres_job_snapshot_fat_split",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_snapshot_fat_split_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_fat_split_pg"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ===== Prepare PG side: 2100 rows so a single split spans > 1
fetcher batch =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """create table ${pgDB}.${pgSchema}.${table1} (
+ id bigint PRIMARY KEY,
+ payload varchar(32),
+ version integer
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, payload,
version)
+ SELECT g, 'snap', 0 FROM generate_series(1, 2100) g"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "snapshot_split_size" = "3000"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ log.info("snapshot row count: " + cnt)
+ cnt.get(0).get(0) == 2100
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ def distinctCount = sql """SELECT COUNT(DISTINCT id) FROM
${currentDb}.${table1}"""
+ assert distinctCount.get(0).get(0) == 2100
+ def boundary = sql """SELECT MIN(id), MAX(id) FROM
${currentDb}.${table1}"""
+ assert boundary.get(0).get(0) == 1
+ assert boundary.get(0).get(1) == 2100
+ // Specifically assert rows past the first batch (id > 2048) are
present.
+ def tail = sql """SELECT COUNT(1) FROM ${currentDb}.${table1} WHERE id
BETWEEN 2049 AND 2100"""
+ assert tail.get(0).get(0) == 52
+
+ // ===== Incremental phase: verify post-snapshot DML still flows =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, payload,
version) VALUES (3000, 'incr_ins', 1)"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET version=99 WHERE
id=2100"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE id=1"""
+ }
+
+ try {
+ Awaitility.await().atMost(180, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def cnt = sql """select count(1) from
${currentDb}.${table1}"""
+ def upd = sql """select version from
${currentDb}.${table1} where id=2100"""
+ def ins = sql """select count(1) from
${currentDb}.${table1} where id=3000"""
+ def del = sql """select count(1) from
${currentDb}.${table1} where id=1"""
+ def v = upd.size() == 0 ? null : upd.get(0).get(0)
+ log.info("incr cnt=${cnt} id2100.version=${v}
id3000.exists=${ins} id1.exists=${del}")
+ cnt.get(0).get(0) == 2100 &&
+ v != null && v.toString() == '99' &&
+ ins.get(0).get(0) == 1 &&
+ del.get(0).get(0) == 0
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job (incr): " + showjob)
+ log.info("show task (incr): " + showtask)
+ throw ex
+ }
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]