This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e27f117471 [improve](streaming-job) avoid potential OOM when reading 
large snapshot splits (#63833)
6e27f117471 is described below

commit 6e27f117471f481e13cebabe0454dddc60e5245c
Author: wudi <[email protected]>
AuthorDate: Fri May 29 11:39:11 2026 +0800

    [improve](streaming-job) avoid potential OOM when reading large snapshot 
splits (#63833)
    
    ## Summary
    - Default-skip flink-cdc's in-snapshot backfill on the from-to path so
    large splits no longer accumulate the entire chunk + backfill stream in
    the fetcher's outputBuffer; from-to is at-least-once and tolerates the
    duplicates this introduces. TVF (job-driven and standalone) keeps the
    standard `false` default for exactly-once via per-task offset commit.
    - Expose `skip_snapshot_backfill` as a user-facing property with strict
    `true`/`false` validation on both from-to (CREATE JOB) and TVF (SELECT
    FROM cdc_stream(...)) entry points.
    - Fix snapshot completion under `pollWithoutBuffer`: a split is now
    marked complete only after its high-watermark event has been consumed
    (`splitState.getHighWatermark() != null`), not on the first non-empty
    fetcher batch. Without this, enabling the new default truncates any
    split larger than debezium's `max.batch.size` and yields an NPE on
    offset extraction.
    - Read `streaming_task_timeout_multiplier` live in
    `StreamingMultiTblTask.isTimeout()` so `admin set frontend config`
    affects already-running tasks, matching the `@ConfField(mutable=true)`
    contract.
---
 .../apache/doris/job/cdc/DataSourceConfigKeys.java |   1 +
 .../streaming/DataSourceConfigValidator.java       |  10 ++
 .../insert/streaming/StreamingInsertJob.java       |   2 +
 .../insert/streaming/StreamingMultiTblTask.java    |   5 +-
 .../CdcStreamTableValuedFunction.java              |  12 ++
 .../cdcclient/service/PipelineCoordinator.java     |  14 +-
 .../doris/cdcclient/sink/DorisBatchStreamLoad.java |   2 +-
 .../source/reader/JdbcIncrementalSourceReader.java |  40 +++++-
 .../cdcclient/source/reader/SourceReader.java      |   5 +
 .../source/reader/mysql/MySqlSourceReader.java     |  38 ++++-
 .../reader/postgres/PostgresSourceReader.java      |   4 +
 ...t_streaming_mysql_job_snapshot_fat_split.groovy | 153 +++++++++++++++++++++
 ...treaming_postgres_job_snapshot_fat_split.groovy | 141 +++++++++++++++++++
 13 files changed, 418 insertions(+), 9 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 3708e8dc6a3..75617e4a907 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -38,6 +38,7 @@ public class DataSourceConfigKeys {
     public static final String SNAPSHOT_SPLIT_KEY = "snapshot_split_key";
     public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
     public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
+    public static final String SKIP_SNAPSHOT_BACKFILL = 
"skip_snapshot_backfill";
     // MySQL CDC client identity. Single value "5400" or range "5400-5408".
     public static final String SERVER_ID = "server_id";
     public static final String SSL_MODE = "ssl_mode";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index 4ca1e605ef5..1c1dc10c0d5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -50,6 +50,7 @@ public class DataSourceConfigValidator {
             DataSourceConfigKeys.EXCLUDE_TABLES,
             DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE,
             DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+            DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL,
             DataSourceConfigKeys.SSL_MODE,
             DataSourceConfigKeys.SSL_ROOTCERT,
             DataSourceConfigKeys.SLOT_NAME,
@@ -208,12 +209,21 @@ public class DataSourceConfigValidator {
                 || key.equals(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)) {
             return isPositiveInt(value);
         }
+        if (key.equals(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)) {
+            return isValidBoolean(value);
+        }
         if (key.equals(DataSourceConfigKeys.SERVER_ID)) {
             return parseServerIdRange(value) != null;
         }
         return true;
     }
 
+    // Strict boolean: only "true"/"false" (case-insensitive); 
Boolean.parseBoolean would
+    // silently coerce typos like "yes" to false.
+    public static boolean isValidBoolean(String value) {
+        return "true".equalsIgnoreCase(value) || 
"false".equalsIgnoreCase(value);
+    }
+
     public static boolean isPositiveInt(String value) {
         if (value == null) {
             return false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 55c761a5d42..09090ba3ad1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -276,6 +276,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         if (!sourceProperties.containsKey(DataSourceConfigKeys.OFFSET)) {
             sourceProperties.put(DataSourceConfigKeys.OFFSET, 
DataSourceConfigKeys.OFFSET_LATEST);
         }
+        // from-to is at-least-once; default-skip in-snapshot backfill.
+        
sourceProperties.putIfAbsent(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL, 
"true");
     }
 
     private List<String> createTableIfNotExists() throws Exception {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 1d3fdf66f04..c257613749d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -82,7 +82,6 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     private long loadBytes = 0L;
     private long filteredRows = 0L;
     private long loadedRows = 0L;
-    private long timeoutMs;
     private long runningBackendId;
 
     public StreamingMultiTblTask(Long jobId,
@@ -103,7 +102,6 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         this.jobProperties = jobProperties;
         this.targetDb = targetDb;
         this.cloudCluster = cloudCluster;
-        this.timeoutMs = Config.streaming_task_timeout_multiplier * 
jobProperties.getMaxIntervalSecond() * 1000L;
     }
 
     @Override
@@ -327,6 +325,9 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
             // It's still pending, waiting for scheduling.
             return false;
         }
+        // Read multiplier live so config changes affect already-running tasks.
+        long timeoutMs = Config.streaming_task_timeout_multiplier
+                * jobProperties.getMaxIntervalSecond() * 1000L;
         long elapsed = System.currentTimeMillis() - startTimeMs;
         if (elapsed > timeoutMs) {
             log.info("Task {} timeout detected: elapsed={}ms, timeoutMs={}ms", 
taskId, elapsed, timeoutMs);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
index aa60924601c..ae3c815789f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
@@ -156,6 +156,7 @@ public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunctio
         }
         validatePositiveIntIfPresent(properties, 
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE);
         validatePositiveIntIfPresent(properties, 
DataSourceConfigKeys.SNAPSHOT_PARALLELISM);
+        validateBooleanIfPresent(properties, 
DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL);
         // TVF entrypoint shares server_id checks with the from-to path's 
validateSource.
         try {
             DataSourceConfigValidator.validateServerIdConfig(properties);
@@ -186,6 +187,17 @@ public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunctio
         }
     }
 
+    private static void validateBooleanIfPresent(Map<String, String> 
properties, String key)
+            throws AnalysisException {
+        String value = properties.get(key);
+        if (value == null) {
+            return;
+        }
+        if (!DataSourceConfigValidator.isValidBoolean(value)) {
+            throw new AnalysisException("Invalid value for key '" + key + "': 
" + value);
+        }
+    }
+
     private void generateFileStatus() {
         this.fileStatuses.clear();
         this.fileStatuses.add(new TBrokerFileStatus(URI, false, 
Integer.MAX_VALUE, false));
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 9b2dd5d357f..18bd9fcc211 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -168,6 +168,7 @@ public class PipelineCoordinator {
                     long elapsedTime = System.currentTimeMillis() - startTime;
                     boolean timeoutReached = elapsedTime > 
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
                     if (shouldStop(
+                            sourceReader,
                             isSnapshotSplit,
                             hasReceivedData,
                             lastMessageIsHeartbeat,
@@ -315,6 +316,7 @@ public class PipelineCoordinator {
                     boolean timeoutReached = elapsedTime > 
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
 
                     if (shouldStop(
+                            sourceReader,
                             isSnapshotSplit,
                             hasReceivedData,
                             lastMessageIsHeartbeat,
@@ -484,6 +486,7 @@ public class PipelineCoordinator {
                                     && elapsedTime >= maxIntervalMillis;
 
                     if (shouldStop(
+                            sourceReader,
                             isSnapshotSplit,
                             scannedRows > 0,
                             lastMessageIsHeartbeat,
@@ -615,6 +618,7 @@ public class PipelineCoordinator {
      * @return true if should stop, false if should continue
      */
     private boolean shouldStop(
+            SourceReader sourceReader,
             boolean isSnapshotSplit,
             boolean hasData,
             boolean lastMessageIsHeartbeat,
@@ -622,11 +626,13 @@ public class PipelineCoordinator {
             long maxIntervalMillis,
             boolean timeoutReached) {
 
-        // 1. Snapshot split with data: if no more data in queue, stop 
immediately (no need to wait
-        // for timeout)
-        // snapshot split will be written to the debezium queue all at once.
-        // multiple snapshot splits are handled in the source reader.
+        // Snapshot split: wait until every split has received its 
high-watermark event;
+        // an empty poll alone is not a finish signal under pollWithoutBuffer 
where the
+        // fetcher returns one ChangeEventQueue batch at a time.
         if (isSnapshotSplit) {
+            if (!sourceReader.isSnapshotFinished()) {
+                return false;
+            }
             LOG.info(
                     "Snapshot split finished, no more data available. Total 
elapsed: {} ms",
                     elapsedTime);
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index 4583b049b81..d1aa7ccb38f 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -417,7 +417,7 @@ public class DorisBatchStreamLoad implements Serializable {
                                     OBJECT_MAPPER.readValue(loadResult, 
RespContent.class);
                             if 
(DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                                 long cacheByteBeforeFlush =
-                                        
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
+                                        
currentCacheBytes.getAndAdd(-buffer.getBufferSizeBytes());
                                 LOG.info(
                                         "load success, cacheBeforeFlushBytes: 
{}, currentCacheBytes : {}",
                                         cacheByteBeforeFlush,
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index ddbc71c7fd7..f0987eb97ee 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -468,6 +468,9 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
             return Collections.emptyIterator();
         }
 
+        // A split is finished only after its high-watermark event has been 
consumed.
+        refreshCompletedSplits();
+
         if (completedSplitIds.size() >= snapshotReaderContexts.size()) {
             LOG.info("All {} snapshot splits have been completed", 
snapshotReaderContexts.size());
             return Collections.emptyIterator();
@@ -515,6 +518,11 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
                             Fetcher<SourceRecords, SourceSplitBase>,
                             SnapshotSplitState>
                     context = snapshotReaderContexts.get(index);
+            // Skip splits already drained to high-watermark; otherwise their 
poll futures spin
+            // returning null and starve siblings.
+            if (completedSplitIds.contains(context.getSplit().splitId())) {
+                continue;
+            }
 
             CompletableFuture<PollResult> future =
                     CompletableFuture.supplyAsync(
@@ -569,11 +577,12 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
                     snapshot.remove(future);
                     PollResult result = future.get();
                     if (result != null) {
+                        // Split completion is determined later by 
splitState.getHighWatermark()
+                        // != null, not by receiving a non-empty batch.
                         LOG.info(
                                 "Got result from reader {}, {} futures 
remaining",
                                 result.context.getSplit().splitId(),
                                 snapshot.size());
-                        
completedSplitIds.add(result.context.getSplit().splitId());
                         return result;
                     }
                     // If result is null (no data), continue checking other 
futures
@@ -839,6 +848,35 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
         return offsetRes;
     }
 
+    @Override
+    public boolean isSnapshotFinished() {
+        if (snapshotReaderContexts.isEmpty()) {
+            return true;
+        }
+        for (SnapshotReaderContext<
+                        
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit,
+                        Fetcher<SourceRecords, SourceSplitBase>,
+                        SnapshotSplitState>
+                context : snapshotReaderContexts) {
+            if (context.getSplitState().getHighWatermark() == null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void refreshCompletedSplits() {
+        for (SnapshotReaderContext<
+                        
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit,
+                        Fetcher<SourceRecords, SourceSplitBase>,
+                        SnapshotSplitState>
+                context : snapshotReaderContexts) {
+            if (context.getSplitState().getHighWatermark() != null) {
+                completedSplitIds.add(context.getSplit().splitId());
+            }
+        }
+    }
+
     @Override
     public Map<String, String> extractBinlogStateOffset(Object splitState) {
         Preconditions.checkNotNull(splitState, "splitState is null");
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
index 95eeb052681..b41f66f89fb 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
@@ -98,4 +98,9 @@ public interface SourceReader {
      * indicate how far the source TX log can be discarded.
      */
     default void commitSourceOffset(String jobId, SourceSplit sourceSplit) {}
+
+    /** Whether all snapshot splits have received their high-watermark event. 
*/
+    default boolean isSnapshotFinished() {
+        return true;
+    }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index e5115d1c51a..1bc7db23fd4 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -489,6 +489,9 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
             return Collections.emptyIterator();
         }
 
+        // A split is finished only after its high-watermark event has been 
consumed.
+        refreshCompletedSplits();
+
         if (completedSplitIds.size() >= snapshotReaderContexts.size()) {
             LOG.info("All {} snapshot splits have been completed", 
snapshotReaderContexts.size());
             return Collections.emptyIterator();
@@ -533,6 +536,11 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
             final int index = i;
             SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, 
MySqlSnapshotSplitState>
                     context = snapshotReaderContexts.get(index);
+            // Skip splits already drained to high-watermark; otherwise their 
poll futures spin
+            // returning null and starve siblings.
+            if (completedSplitIds.contains(context.getSplit().splitId())) {
+                continue;
+            }
 
             CompletableFuture<PollResult> future =
                     CompletableFuture.supplyAsync(
@@ -587,11 +595,12 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
                     snapshot.remove(future);
                     PollResult result = future.get();
                     if (result != null) {
+                        // Split completion is determined later by 
splitState.getHighWatermark()
+                        // != null, not by receiving a non-empty batch.
                         LOG.info(
                                 "Got result from reader {}, {} futures 
remaining",
                                 result.context.getSplit().splitId(),
                                 snapshot.size());
-                        
completedSplitIds.add(result.context.getSplit().splitId());
                         return result;
                     }
                     // If result is null (no data), continue checking other 
futures
@@ -992,6 +1001,10 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
                     objectPath, 
cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY));
         }
 
+        // FE injects "true" on TVF path; from-to leaves it absent → default 
false.
+        configFactory.skipSnapshotBackfill(
+                
Boolean.parseBoolean(cdcConfig.get(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)));
+
         return configFactory.createConfig(subtaskId);
     }
 
@@ -1013,6 +1026,29 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
         return new HashMap<>(highWatermark.getOffset());
     }
 
+    @Override
+    public boolean isSnapshotFinished() {
+        if (snapshotReaderContexts.isEmpty()) {
+            return true;
+        }
+        for (SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, 
MySqlSnapshotSplitState>
+                context : snapshotReaderContexts) {
+            if (context.getSplitState().getHighWatermark() == null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void refreshCompletedSplits() {
+        for (SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, 
MySqlSnapshotSplitState>
+                context : snapshotReaderContexts) {
+            if (context.getSplitState().getHighWatermark() != null) {
+                completedSplitIds.add(context.getSplit().splitId());
+            }
+        }
+    }
+
     @Override
     public Map<String, String> extractBinlogStateOffset(Object splitState) {
         Preconditions.checkNotNull(splitState, "splitState is null");
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 8bf53a1eb97..2e09e48957a 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -279,6 +279,10 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         // support scan partition table
         configFactory.setIncludePartitionedTables(true);
 
+        // FE injects "true" on TVF path; from-to leaves it absent → default 
false.
+        configFactory.skipSnapshotBackfill(
+                
Boolean.parseBoolean(cdcConfig.get(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)));
+
         // subtaskId use pg create slot in snapshot phase, slotname is 
slot_name_subtaskId
         return configFactory.create(subtaskId);
     }
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_fat_split.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_fat_split.groovy
new file mode 100644
index 00000000000..c941a5a0afc
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_fat_split.groovy
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Regression for the snapshot completion bug: a single split whose row count 
exceeds
+// debezium's default max.batch.size=2048 must still be drained completely. 
The fix
+// keys completion off the high-watermark event instead of the first non-empty 
batch.
+// 2100 rows / split_size=3000 -> one snapshot split that the fetcher needs at 
least
+// two batches to drain (2048 + 52 + hw event).
+suite("test_streaming_mysql_job_snapshot_fat_split", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_snapshot_fat_split_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_fat_split_mysql"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // ===== Prepare MySQL side: 2100 rows so a single split spans > 1 
fetcher batch =====
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """CREATE TABLE ${mysqlDb}.${table1} (
+                       `id` bigint NOT NULL,
+                       `payload` varchar(32),
+                       `version` int,
+                       PRIMARY KEY (`id`)
+                   ) ENGINE=InnoDB"""
+
+            // Bulk insert in 500-row chunks to stay under MySQL's default 
max_allowed_packet.
+            int total = 2100
+            int chunk = 500
+            int sent = 0
+            while (sent < total) {
+                int end = Math.min(sent + chunk, total)
+                StringBuilder sb = new StringBuilder()
+                sb.append("INSERT INTO ${mysqlDb}.${table1} (id, payload, 
version) VALUES ")
+                for (int i = sent + 1; i <= end; i++) {
+                    if (i > sent + 1) sb.append(", ")
+                    sb.append("(${i}, 'snap', 0)")
+                }
+                sql sb.toString()
+                sent = end
+            }
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "3000"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        log.info("snapshot row count: " + cnt)
+                        cnt.get(0).get(0) == 2100
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        def distinctCount = sql """SELECT COUNT(DISTINCT id) FROM 
${currentDb}.${table1}"""
+        assert distinctCount.get(0).get(0) == 2100
+        def boundary = sql """SELECT MIN(id), MAX(id) FROM 
${currentDb}.${table1}"""
+        assert boundary.get(0).get(0) == 1
+        assert boundary.get(0).get(1) == 2100
+        // Specifically assert rows past the first batch (id > 2048) are 
present.
+        def tail = sql """SELECT COUNT(1) FROM ${currentDb}.${table1} WHERE id 
BETWEEN 2049 AND 2100"""
+        assert tail.get(0).get(0) == 52
+
+        // ===== Incremental phase: verify post-snapshot DML still flows =====
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """INSERT INTO ${mysqlDb}.${table1} (id, payload, version) 
VALUES (3000, 'incr_ins', 1)"""
+            sql """UPDATE ${mysqlDb}.${table1} SET version=99 WHERE id=2100"""
+            sql """DELETE FROM ${mysqlDb}.${table1} WHERE id=1"""
+        }
+
+        try {
+            Awaitility.await().atMost(180, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        def upd = sql """select version from 
${currentDb}.${table1} where id=2100"""
+                        def ins = sql """select count(1) from 
${currentDb}.${table1} where id=3000"""
+                        def del = sql """select count(1) from 
${currentDb}.${table1} where id=1"""
+                        def v = upd.size() == 0 ? null : upd.get(0).get(0)
+                        log.info("incr cnt=${cnt} id2100.version=${v} 
id3000.exists=${ins} id1.exists=${del}")
+                        cnt.get(0).get(0) == 2100 &&
+                                v != null && v.toString() == '99' &&
+                                ins.get(0).get(0) == 1 &&
+                                del.get(0).get(0) == 0
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job (incr): " + showjob)
+            log.info("show task (incr): " + showtask)
+            throw ex
+        }
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_fat_split.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_fat_split.groovy
new file mode 100644
index 00000000000..117e4495453
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_fat_split.groovy
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Regression for the snapshot completion bug: a single split whose row count 
exceeds
+// debezium's default max.batch.size=2048 must still be drained completely. 
The fix
+// keys completion off the high-watermark event instead of the first non-empty 
batch.
+// 2100 rows / split_size=3000 -> one snapshot split that the fetcher needs at 
least
+// two batches to drain (2048 + 52 + hw event).
+suite("test_streaming_postgres_job_snapshot_fat_split", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_snapshot_fat_split_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_fat_split_pg"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ===== Prepare PG side: 2100 rows so a single split spans > 1 
fetcher batch =====
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """create table ${pgDB}.${pgSchema}.${table1} (
+                       id      bigint PRIMARY KEY,
+                       payload varchar(32),
+                       version integer
+                   )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, payload, 
version)
+                   SELECT g, 'snap', 0 FROM generate_series(1, 2100) g"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "3000"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        log.info("snapshot row count: " + cnt)
+                        cnt.get(0).get(0) == 2100
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        def distinctCount = sql """SELECT COUNT(DISTINCT id) FROM 
${currentDb}.${table1}"""
+        assert distinctCount.get(0).get(0) == 2100
+        def boundary = sql """SELECT MIN(id), MAX(id) FROM 
${currentDb}.${table1}"""
+        assert boundary.get(0).get(0) == 1
+        assert boundary.get(0).get(1) == 2100
+        // Specifically assert rows past the first batch (id > 2048) are 
present.
+        def tail = sql """SELECT COUNT(1) FROM ${currentDb}.${table1} WHERE id 
BETWEEN 2049 AND 2100"""
+        assert tail.get(0).get(0) == 52
+
+        // ===== Incremental phase: verify post-snapshot DML still flows =====
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, payload, 
version) VALUES (3000, 'incr_ins', 1)"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET version=99 WHERE 
id=2100"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE id=1"""
+        }
+
+        try {
+            Awaitility.await().atMost(180, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt = sql """select count(1) from 
${currentDb}.${table1}"""
+                        def upd = sql """select version from 
${currentDb}.${table1} where id=2100"""
+                        def ins = sql """select count(1) from 
${currentDb}.${table1} where id=3000"""
+                        def del = sql """select count(1) from 
${currentDb}.${table1} where id=1"""
+                        def v = upd.size() == 0 ? null : upd.get(0).get(0)
+                        log.info("incr cnt=${cnt} id2100.version=${v} 
id3000.exists=${ins} id1.exists=${del}")
+                        cnt.get(0).get(0) == 2100 &&
+                                v != null && v.toString() == '99' &&
+                                ins.get(0).get(0) == 1 &&
+                                del.get(0).get(0) == 0
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job (incr): " + showjob)
+            log.info("show task (incr): " + showtask)
+            throw ex
+        }
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to