Copilot commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3232807343


##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -473,6 +519,58 @@ public void replayIfNeed(StreamingInsertJob job) throws 
JobException {
         } else {
             log.info("No need to replay offset provider for job {}", 
getJobId());
         }
+
+        // Resume cdcSplitProgress from the at-most-one table cut to mid so 
the next
+        // advanceSplits() RPC won't re-cut already-fetched splits.
+        synchronized (splitsLock) {
+            if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+                return;
+            }
+            SnapshotSplit mid = findResumeMidSplit(cachedSyncTables, 
finishedSplits, remainingSplits);
+            if (mid != null) {
+                applySplitToProgress(cdcSplitProgress, mid);
+            } else {
+                clearProgress(cdcSplitProgress);
+            }
+            log.info("Replay summary for job {}: finishedSplits={}, 
remainingSplits={}, "
+                            + "committedSplitProgress=(table={}, nextStart={}, 
nextSplitId={}), "
+                            + "cdcSplitProgress=(table={}, nextStart={}, 
nextSplitId={})",
+                    getJobId(), finishedSplits.size(), remainingSplits.size(),
+                    committedSplitProgress == null ? null : 
committedSplitProgress.getCurrentSplittingTable(),
+                    committedSplitProgress == null ? null : 
Arrays.toString(committedSplitProgress.getNextSplitStart()),
+                    committedSplitProgress == null ? null : 
committedSplitProgress.getNextSplitId(),
+                    cdcSplitProgress.getCurrentSplittingTable(),
+                    Arrays.toString(cdcSplitProgress.getNextSplitStart()),
+                    cdcSplitProgress.getNextSplitId());
+        }
+    }
+
+    /**
+     * Find the at-most-one table cut to mid (its largest-id split has 
non-null splitEnd).
+     * Returns null when every table in {@code syncTables} is either untouched 
or fully cut.
+     */
+    static SnapshotSplit findResumeMidSplit(List<String> syncTables,
+                                            List<SnapshotSplit> finishedSplits,
+                                            List<SnapshotSplit> 
remainingSplits) {
+        Map<String, SnapshotSplit> lastPerTable = new HashMap<>();
+        pickLastById(finishedSplits, lastPerTable);
+        pickLastById(remainingSplits, lastPerTable);
+        for (String tbl : syncTables) {
+            SnapshotSplit last = lastPerTable.get(tbl);
+            if (last != null && last.getSplitEnd() != null && 
last.getSplitEnd().length > 0) {
+                return last;
+            }
+        }

Review Comment:
   findResumeMidSplit() keys `lastPerTable` by `SnapshotSplit.tableId` 
(qualified) but then looks up entries using values from `syncTables` (often 
unqualified table names). If these formats differ, replay will fail to detect a 
mid-table cut and will clear cdcSplitProgress, causing unnecessary re-cutting 
and potentially incorrect resume logic. Consider normalizing both sides (e.g., 
compare by `getTableName(...)`) or storing syncTables in the same qualified 
form used in SnapshotSplit.tableId.
   
   This issue also appears in the following locations of the same file:
   - line 750
   - line 798



##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -163,21 +169,52 @@ public static void deleteJobMeta(Long jobId) {
         }
     }
 
-    public static void insertSplitsToMeta(Long jobId, Map<String, 
List<SnapshotSplit>> tableSplits) throws Exception {
-        List<String> values = new ArrayList<>();
-        int index = 1;
-        for (Map.Entry<String, List<SnapshotSplit>> entry : 
tableSplits.entrySet()) {
-            Map<String, String> params = new HashMap<>();
-            params.put("id", index + "");
-            params.put("job_id", jobId + "");
-            params.put("table_name", entry.getKey());
-            params.put("chunk_list", 
objectMapper.writeValueAsString(entry.getValue()));
-            StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
-            String sql = 
stringSubstitutor.replace(INSERT_INTO_META_TABLE_TEMPLATE);
-            values.add(sql);
-            index++;
+    /**
+     * UPSERT a single table's chunk_list. id is reused if the table already 
has a row,
+     * otherwise allocated as MAX(id)+1. Relies on UNIQUE KEY (id, job_id) for 
in-place override.
+     */
+    public static void upsertChunkList(Long jobId, String tableName, 
List<SnapshotSplit> chunks) throws Exception {
+        createMetaTableIfNotExist();
+        Integer id = querySingleTableId(jobId, tableName);
+        if (id == null) {
+            id = queryNextAvailableId(jobId);
+        }
+        Map<String, String> params = new HashMap<>();
+        params.put("id", String.valueOf(id));
+        params.put("job_id", String.valueOf(jobId));
+        params.put("table_name", tableName);
+        params.put("chunk_list", objectMapper.writeValueAsString(chunks));
+        StringSubstitutor sub = new StringSubstitutor(params);
+        String sql = sub.replace(INSERT_INTO_META_TABLE_TEMPLATE);
+        batchInsert(Collections.singletonList(sql));
+    }
+
+    /** Returns id of the row matching (jobId, tableName), or null if no such 
row exists. */
+    private static Integer querySingleTableId(Long jobId, String tableName) 
throws JobException {
+        String sql = String.format(SELECT_TABLE_ID_TEMPLATE, jobId, tableName);
+        try (AutoCloseConnectContext ctx = new 
AutoCloseConnectContext(buildConnectContext())) {
+            StmtExecutor stmtExecutor = new StmtExecutor(ctx.connectContext, 
sql);
+            List<ResultRow> rows = stmtExecutor.executeInternalQuery();
+            if (rows == null || rows.isEmpty()) {
+                return null;
+            }
+            return Integer.parseInt(rows.get(0).get(0));
+        } catch (Exception e) {
+            throw new JobException("query table id failed: " + e.getMessage());
+        }
+    }

Review Comment:
   upsertChunkList() builds SQL by string substitution and wraps `table_name` 
and `chunk_list` in single quotes without escaping. Since chunk boundaries can 
contain arbitrary string PK values, the JSON may contain `'` or `\`, which will 
break the INSERT or allow SQL injection into the internal meta table. Please 
escape substituted values (e.g. via `StatisticsUtil.escapeSQL`) or switch to a 
parameterized/internal API that avoids raw SQL concatenation for string/JSON 
fields.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +133,99 @@ public void initialize(String jobId, DataSource 
dataSource, Map<String, String>
         LOG.info("Initialized poll executor with parallelism: {}", 
parallelism);
     }
 
+    /**
+     * Fetch a batch of snapshot splits by driving flink-cdc {@link 
ChunkSplitter} directly.
+     *
+     * <p>Stateless: each RPC builds a fresh splitter from the (table, 
nextChunkStart, nextChunkId)
+     * triple supplied by FE, fetches up to {@code batchSize} chunks, then 
closes the splitter.
+     *
+     * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest 
modes never reach here.
+     */
     @Override
     public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest 
ftsReq) {
-        LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), 
ftsReq.getJobId());
+        LOG.info(
+                "Get table {} splits for job {} (nextSplitId={}, 
nextSplitStart={})",
+                ftsReq.getSnapshotTable(),
+                ftsReq.getJobId(),
+                ftsReq.getNextSplitId(),
+                java.util.Arrays.toString(ftsReq.getNextSplitStart()));
         JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-        
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
-                remainingSnapshotSplits = new ArrayList<>();
-        StreamSplit remainingStreamSplit = null;
-
-        // Check startup mode - for PostgreSQL, we use similar logic as MySQL
-        String startupMode = 
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
-        if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
-                || 
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
-            remainingSnapshotSplits =
-                    startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
-        } else {
-            // For non-initial mode, create a stream split
-            Offset startingOffset = createInitialOffset();
-            remainingStreamSplit =
-                    new StreamSplit(
-                            STREAM_SPLIT_ID,
-                            startingOffset,
-                            createNoStoppingOffset(),
-                            new ArrayList<>(),
-                            new HashMap<>(),
-                            0);
-        }
+        String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+        TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
 
-        List<AbstractSourceSplit> splits = new ArrayList<>();
-        if (!remainingSnapshotSplits.isEmpty()) {
-            for 
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit
-                    snapshotSplit : remainingSnapshotSplits) {
-                String splitId = snapshotSplit.splitId();
-                String tableId = snapshotSplit.getTableId().identifier();
-                Object[] splitStart = snapshotSplit.getSplitStart();
-                Object[] splitEnd = snapshotSplit.getSplitEnd();
-                List<String> splitKey = 
snapshotSplit.getSplitKeyType().getFieldNames();
-                SnapshotSplit split =
-                        new SnapshotSplit(splitId, tableId, splitKey, 
splitStart, splitEnd, null);
-                splits.add(split);
+        int batchSize = ftsReq.getBatchSize() == null ? 100 : 
ftsReq.getBatchSize();
+        ChunkSplitterState state = buildChunkSplitterState(sourceConfig, 
tableId, ftsReq);
+        ChunkSplitter splitter = 
getDialect(sourceConfig).createChunkSplitter(sourceConfig, state);
+
+        try {
+            splitter.open();
+            List<AbstractSourceSplit> result = new ArrayList<>();
+            while (result.size() < batchSize) {
+                
Collection<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
+                        chunks = splitter.generateSplits(tableId);
+                for 
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit chunk :
+                        chunks) {
+                    result.add(toDorisSnapshotSplit(chunk));

Review Comment:
   Same as MySqlSourceReader: the batching loop appends the entire `chunks` 
collection returned by `splitter.generateSplits(...)` without enforcing the 
`batchSize` limit. If the underlying splitter returns multiple splits in one 
call, this method can exceed the requested batch size and undermine the FE-side 
async batching assumptions. Consider capping the returned list or 
clarifying/renaming `batchSize` to indicate it's best-effort.
   
   This issue also appears on line 203 of the same file.
   



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -158,46 +161,148 @@ public void initialize(String jobId, DataSource 
dataSource, Map<String, String>
         LOG.info("Initialized poll executor with parallelism: {}", 
parallelism);
     }
 
+    /**
+     * Fetch a batch of snapshot splits by driving flink-cdc {@link 
MySqlChunkSplitter} directly.
+     *
+     * <p>Stateless: each RPC rebuilds the splitter from (table, 
nextSplitStart, nextSplitId)
+     * supplied by FE, splits up to {@code batchSize} chunks, then closes. 
Note: evenly-distributed
+     * PKs go through a single splitChunks() call returning all chunks at 
once, so batchSize is only
+     * effective on the uneven path.
+     *
+     * <p>Only INITIAL/SNAPSHOT startup modes go through the chunk path; other 
modes return a single
+     * BinlogSplit instead.
+     */
     @Override
     public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest 
ftsReq) {
-        LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), 
ftsReq.getJobId());
+        LOG.info(
+                "Get table {} splits for job {} (nextSplitId={}, 
nextSplitStart={})",
+                ftsReq.getSnapshotTable(),
+                ftsReq.getJobId(),
+                ftsReq.getNextSplitId(),
+                java.util.Arrays.toString(ftsReq.getNextSplitStart()));
         MySqlSourceConfig sourceConfig = getSourceConfig(ftsReq);
         StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
-        List<MySqlSnapshotSplit> remainingSnapshotSplits = new ArrayList<>();
-        MySqlBinlogSplit remainingBinlogSplit = null;
-        if (startupMode.equals(StartupMode.INITIAL) || 
startupMode.equals(StartupMode.SNAPSHOT)) {
-            remainingSnapshotSplits =
-                    startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
-        } else {
-            remainingBinlogSplit =
-                    new MySqlBinlogSplit(
-                            BINLOG_SPLIT_ID,
-                            sourceConfig.getStartupOptions().binlogOffset,
-                            BinlogOffset.ofNonStopping(),
-                            new ArrayList<>(),
-                            new HashMap<>(),
-                            0);
+
+        if (!startupMode.equals(StartupMode.INITIAL) && 
!startupMode.equals(StartupMode.SNAPSHOT)) {
+            BinlogSplit binlogSplit = new BinlogSplit();
+            binlogSplit.setSplitId(BINLOG_SPLIT_ID);
+            binlogSplit.setStartingOffset(
+                    sourceConfig.getStartupOptions().binlogOffset.getOffset());
+            return Collections.singletonList(binlogSplit);
         }
-        List<AbstractSourceSplit> splits = new ArrayList<>();
-        if (!remainingSnapshotSplits.isEmpty()) {
-            for (MySqlSnapshotSplit snapshotSplit : remainingSnapshotSplits) {
-                String splitId = snapshotSplit.splitId();
-                String tableId = snapshotSplit.getTableId().identifier();
-                Object[] splitStart = snapshotSplit.getSplitStart();
-                Object[] splitEnd = snapshotSplit.getSplitEnd();
-                List<String> splitKey = 
snapshotSplit.getSplitKeyType().getFieldNames();
-                SnapshotSplit split =
-                        new SnapshotSplit(splitId, tableId, splitKey, 
splitStart, splitEnd, null);
-                splits.add(split);
+
+        String database = 
ftsReq.getConfig().get(DataSourceConfigKeys.DATABASE);
+        TableId tableId = TableId.parse(database + "." + 
ftsReq.getSnapshotTable());
+        int batchSize = ftsReq.getBatchSize() == null ? 100 : 
ftsReq.getBatchSize();
+
+        boolean isCaseSensitive;
+        try (MySqlConnection jdbc = 
DebeziumUtils.createMySqlConnection(sourceConfig)) {
+            isCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to query table id case 
sensitivity", e);
+        }
+        MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, 
isCaseSensitive);
+        MySqlPartition partition =
+                new 
MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
+
+        ChunkSplitterState state =
+                buildChunkSplitterState(sourceConfig, tableId, ftsReq, 
mySqlSchema, partition);
+        MySqlChunkSplitter splitter = new MySqlChunkSplitter(mySqlSchema, 
sourceConfig, state);
+
+        try {
+            splitter.open();
+            List<AbstractSourceSplit> result = new ArrayList<>();
+            while (result.size() < batchSize) {
+                List<MySqlSnapshotSplit> chunks = 
splitter.splitChunks(partition, tableId);
+                for (MySqlSnapshotSplit chunk : chunks) {
+                    result.add(toDorisSnapshotSplit(chunk));
+                }
+                if (!splitter.hasNextChunk()) {

Review Comment:
   getSourceSplits() can return more than `batchSize` splits because it appends 
the full `chunks` list from each `splitter.splitChunks(...)` call without 
capping to the remaining budget. Even if the uneven path often returns a single 
chunk, the code currently doesn't enforce the documented batchSize behavior and 
could cause FE to persist/process unexpectedly large split batches. Consider 
limiting the returned list to `batchSize` (or clarifying that batchSize is 
best-effort) and making the cursor/progress semantics consistent with that 
choice.
   
   This issue also appears on line 257 of the same file.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to