github-actions[bot] commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3248225695


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +134,95 @@ public void initialize(String jobId, DataSource 
dataSource, Map<String, String>
         LOG.info("Initialized poll executor with parallelism: {}", 
parallelism);
     }
 
+    /**
+     * Fetch a batch of snapshot splits by driving flink-cdc {@link 
ChunkSplitter} directly.
+     *
+     * <p>Stateless: each RPC builds a fresh splitter from the (table, 
nextChunkStart, nextChunkId)
+     * triple supplied by FE, fetches up to {@code batchSize} chunks, then 
closes the splitter.
+     *
+     * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest 
modes never reach here.
+     */
     @Override
     public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest 
ftsReq) {
-        LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), 
ftsReq.getJobId());
+        LOG.info(
+                "Get table {} splits for job {} (nextSplitId={}, 
nextSplitStart={})",
+                ftsReq.getSnapshotTable(),
+                ftsReq.getJobId(),
+                ftsReq.getNextSplitId(),
+                java.util.Arrays.toString(ftsReq.getNextSplitStart()));
         JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-        
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
-                remainingSnapshotSplits = new ArrayList<>();
-        StreamSplit remainingStreamSplit = null;
-
-        // Check startup mode - for PostgreSQL, we use similar logic as MySQL
-        String startupMode = 
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
-        if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
-                || 
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
-            remainingSnapshotSplits =
-                    startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
-        } else {
-            // For non-initial mode, create a stream split
-            Offset startingOffset = createInitialOffset();
-            remainingStreamSplit =
-                    new StreamSplit(
-                            STREAM_SPLIT_ID,
-                            startingOffset,
-                            createNoStoppingOffset(),
-                            new ArrayList<>(),
-                            new HashMap<>(),
-                            0);
-        }
+        String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+        TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
+
+        int batchSize = ftsReq.getBatchSize() == null ? 100 : 
ftsReq.getBatchSize();
+        ChunkSplitterState state = buildChunkSplitterState(sourceConfig, 
tableId, ftsReq);
+        ChunkSplitter splitter = 
getDialect(sourceConfig).createChunkSplitter(sourceConfig, state);
 
-        List<AbstractSourceSplit> splits = new ArrayList<>();
-        if (!remainingSnapshotSplits.isEmpty()) {
-            for 
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit
-                    snapshotSplit : remainingSnapshotSplits) {
-                String splitId = snapshotSplit.splitId();
-                String tableId = snapshotSplit.getTableId().identifier();
-                Object[] splitStart = snapshotSplit.getSplitStart();
-                Object[] splitEnd = snapshotSplit.getSplitEnd();
-                List<String> splitKey = 
snapshotSplit.getSplitKeyType().getFieldNames();
-                SnapshotSplit split =
-                        new SnapshotSplit(splitId, tableId, splitKey, 
splitStart, splitEnd, null);
-                splits.add(split);
+        try {
+            splitter.open();
+            List<AbstractSourceSplit> result = new ArrayList<>();
+            while (result.size() < batchSize) {
+                
Collection<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
+                        chunks = splitter.generateSplits(tableId);
+                for 
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit chunk :
+                        chunks) {
+                    result.add(toDorisSnapshotSplit(chunk));
+                }
+                if (!splitter.hasNextChunk()) {
+                    break;
+                }
             }
-        } else {
-            Offset startingOffset = remainingStreamSplit.getStartingOffset();
-            BinlogSplit streamSplit = new BinlogSplit();
-            streamSplit.setSplitId(remainingStreamSplit.splitId());
-            streamSplit.setStartingOffset(startingOffset.getOffset());
-            splits.add(streamSplit);
+            LOG.info(
+                    "Fetched {} splits for table {} (resume nextSplitId={}); 
hasNextChunk={}",
+                    result.size(),
+                    tableId,
+                    ftsReq.getNextSplitId(),
+                    splitter.hasNextChunk());
+            return result;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to generate splits for " + 
tableId, e);
+        } finally {
+            try {
+                splitter.close();
+            } catch (Exception e) {
+                LOG.warn("Failed to close splitter for {}", tableId, e);
+            }
+        }
+    }
+
+    /**
+     * null start -> NO_SPLITTING_TABLE_STATE (analyze + maybe evenly); 
non-null -> resume
+     * mid-table. Cast pkValues[0] back to the JDBC driver's natural type 
(JSON round-trip
+     * downgrades types).
+     */
+    private ChunkSplitterState buildChunkSplitterState(
+            JdbcSourceConfig sourceConfig, TableId tableId, 
FetchTableSplitsRequest ftsReq) {
+        Object[] pkValues = ftsReq.getNextSplitStart();
+        if (pkValues == null || pkValues.length == 0) {
+            return ChunkSplitterState.NO_SPLITTING_TABLE_STATE;
         }
-        return splits;
+        TableChanges.TableChange tableChange = 
getTableSchemas(ftsReq).get(tableId);
+        Column splitColumn =
+                JdbcChunkUtils.getSplitColumn(
+                        tableChange.getTable(), 
sourceConfig.getChunkKeyColumn());
+        Class<?> targetClass = resolveSplitKeyClass(tableId, splitColumn, 
ftsReq);
+        Object castStart = objectMapper.convertValue(pkValues[0], targetClass);
+        int splitId = ftsReq.getNextSplitId() == null ? 0 : 
ftsReq.getNextSplitId();

Review Comment:
   This async resume path should use the same boundary conversion helper as the 
snapshot-read path. `nextSplitStart` has gone through FE JSON serialization, so 
temporal split keys arrive as strings; `objectMapper.convertValue(..., 
java.sql.Date/Timestamp/Time.class)` does not handle those the way 
`convertBounds()` does. A PostgreSQL/JDBC table split on DATE/TIME/TIMESTAMP 
can therefore fail or resume from an incorrectly typed boundary on the second 
`fetchSplits` RPC. Please convert the boundary with the existing helper, for 
example:
   ```suggestion
           Object castStart = convertBounds(pkValues, targetClass, 
objectMapper)[0];
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -161,46 +161,126 @@ public void initialize(String jobId, DataSource 
dataSource, Map<String, String>
         LOG.info("Initialized poll executor with parallelism: {}", 
parallelism);
     }
 
+    /**
+     * Fetch a batch of snapshot splits by driving flink-cdc {@link 
MySqlChunkSplitter} directly.
+     *
+     * <p>Stateless: each RPC rebuilds the splitter from (table, 
nextSplitStart, nextSplitId)
+     * supplied by FE, splits up to {@code batchSize} chunks, then closes. 
Note: evenly-distributed
+     * PKs go through a single splitChunks() call returning all chunks at 
once, so batchSize is only
+     * effective on the uneven path.
+     *
+     * <p>Only INITIAL/SNAPSHOT startup modes go through the chunk path; other 
modes return a single
+     * BinlogSplit instead.
+     */
     @Override
     public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest 
ftsReq) {
-        LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), 
ftsReq.getJobId());
+        LOG.info(
+                "Get table {} splits for job {} (nextSplitId={}, 
nextSplitStart={})",
+                ftsReq.getSnapshotTable(),
+                ftsReq.getJobId(),
+                ftsReq.getNextSplitId(),
+                java.util.Arrays.toString(ftsReq.getNextSplitStart()));
         MySqlSourceConfig sourceConfig = getSourceConfig(ftsReq);
         StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
-        List<MySqlSnapshotSplit> remainingSnapshotSplits = new ArrayList<>();
-        MySqlBinlogSplit remainingBinlogSplit = null;
-        if (startupMode.equals(StartupMode.INITIAL) || 
startupMode.equals(StartupMode.SNAPSHOT)) {
-            remainingSnapshotSplits =
-                    startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
-        } else {
-            remainingBinlogSplit =
-                    new MySqlBinlogSplit(
-                            BINLOG_SPLIT_ID,
-                            sourceConfig.getStartupOptions().binlogOffset,
-                            BinlogOffset.ofNonStopping(),
-                            new ArrayList<>(),
-                            new HashMap<>(),
-                            0);
+
+        if (!startupMode.equals(StartupMode.INITIAL) && 
!startupMode.equals(StartupMode.SNAPSHOT)) {
+            BinlogSplit binlogSplit = new BinlogSplit();
+            binlogSplit.setSplitId(BINLOG_SPLIT_ID);
+            binlogSplit.setStartingOffset(
+                    sourceConfig.getStartupOptions().binlogOffset.getOffset());
+            return Collections.singletonList(binlogSplit);
         }
-        List<AbstractSourceSplit> splits = new ArrayList<>();
-        if (!remainingSnapshotSplits.isEmpty()) {
-            for (MySqlSnapshotSplit snapshotSplit : remainingSnapshotSplits) {
-                String splitId = snapshotSplit.splitId();
-                String tableId = snapshotSplit.getTableId().identifier();
-                Object[] splitStart = snapshotSplit.getSplitStart();
-                Object[] splitEnd = snapshotSplit.getSplitEnd();
-                List<String> splitKey = 
snapshotSplit.getSplitKeyType().getFieldNames();
-                SnapshotSplit split =
-                        new SnapshotSplit(splitId, tableId, splitKey, 
splitStart, splitEnd, null);
-                splits.add(split);
+
+        String database = 
ftsReq.getConfig().get(DataSourceConfigKeys.DATABASE);
+        TableId tableId = TableId.parse(database + "." + 
ftsReq.getSnapshotTable());
+        int batchSize = ftsReq.getBatchSize() == null ? 100 : 
ftsReq.getBatchSize();
+
+        boolean isCaseSensitive;
+        try (MySqlConnection jdbc = 
DebeziumUtils.createMySqlConnection(sourceConfig)) {
+            isCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to query table id case 
sensitivity", e);
+        }
+        MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, 
isCaseSensitive);
+        MySqlPartition partition =
+                new 
MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
+
+        ChunkSplitterState state =
+                buildChunkSplitterState(sourceConfig, tableId, ftsReq, 
mySqlSchema, partition);
+        MySqlChunkSplitter splitter = new MySqlChunkSplitter(mySqlSchema, 
sourceConfig, state);
+
+        try {
+            splitter.open();
+            List<AbstractSourceSplit> result = new ArrayList<>();
+            while (result.size() < batchSize) {
+                List<MySqlSnapshotSplit> chunks = 
splitter.splitChunks(partition, tableId);
+                for (MySqlSnapshotSplit chunk : chunks) {
+                    result.add(toDorisSnapshotSplit(chunk));
+                }
+                if (!splitter.hasNextChunk()) {
+                    break;
+                }
+            }
+            LOG.info(
+                    "Fetched {} splits for table {} (resume nextSplitId={}); 
hasNextChunk={}",
+                    result.size(),
+                    tableId,
+                    ftsReq.getNextSplitId(),
+                    splitter.hasNextChunk());
+            return result;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to generate splits for " + 
tableId, e);
+        } finally {
+            try {
+                splitter.close();
+            } catch (Exception e) {
+                LOG.warn("Failed to close splitter for {}", tableId, e);
             }
-        } else {
-            BinlogOffset startingOffset = 
remainingBinlogSplit.getStartingOffset();
-            BinlogSplit binlogSplit = new BinlogSplit();
-            binlogSplit.setSplitId(remainingBinlogSplit.splitId());
-            binlogSplit.setStartingOffset(startingOffset.getOffset());
-            splits.add(binlogSplit);
         }
-        return splits;
+    }
+
+    /**
+     * null start -> NO_SPLITTING_TABLE_STATE (analyze + maybe evenly); 
non-null -> resume
+     * mid-table. Cast pkValues[0] back to the JDBC driver's natural type 
(JSON round-trip
+     * downgrades types).
+     */
+    private ChunkSplitterState buildChunkSplitterState(
+            MySqlSourceConfig sourceConfig,
+            TableId tableId,
+            FetchTableSplitsRequest ftsReq,
+            MySqlSchema mySqlSchema,
+            MySqlPartition partition) {
+        Object[] pkValues = ftsReq.getNextSplitStart();
+        if (pkValues == null || pkValues.length == 0) {
+            return ChunkSplitterState.NO_SPLITTING_TABLE_STATE;
+        }
+        Column splitColumn;
+        try (MySqlConnection jdbc = 
DebeziumUtils.createMySqlConnection(sourceConfig)) {
+            splitColumn =
+                    ChunkUtils.getChunkKeyColumn(
+                            mySqlSchema.getTableSchema(partition, jdbc, 
tableId).getTable(),
+                            sourceConfig.getChunkKeyColumns());
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to resolve split column for " + 
tableId, e);
+        }
+        Class<?> targetClass = resolveSplitKeyClass(tableId, splitColumn, 
ftsReq);
+        Object castStart = objectMapper.convertValue(pkValues[0], targetClass);
+        int splitId = ftsReq.getNextSplitId() == null ? 0 : 
ftsReq.getNextSplitId();

Review Comment:
   Same temporal-boundary problem as the generic JDBC reader: async split 
resume casts the JSON-round-tripped split start with 
`objectMapper.convertValue` directly, while the existing snapshot split 
reconstruction uses `convertBounds()` to handle DATE/TIME/TIMESTAMP strings. 
MySQL async splitting on a temporal chunk key can fail or resume with the wrong 
boundary after the first batch. Please use the helper here too, for example:
   ```suggestion
           Object castStart = convertBounds(pkValues, targetClass, 
objectMapper)[0];
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to