github-actions[bot] commented on code in PR #61389:
URL: https://github.com/apache/doris/pull/61389#discussion_r2944732897


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java:
##########
@@ -82,8 +82,9 @@ private static boolean isValidValue(String key, String value) 
{
         }
 
         if (key.equals(DataSourceConfigKeys.OFFSET)
-                && !(value.equals(DataSourceConfigKeys.OFFSET_INITIAL)
-                || value.equals(DataSourceConfigKeys.OFFSET_LATEST))) {
+                && 
!(value.equalsIgnoreCase(DataSourceConfigKeys.OFFSET_INITIAL)
+                || value.equalsIgnoreCase(DataSourceConfigKeys.OFFSET_LATEST)
+                || 
value.equalsIgnoreCase(DataSourceConfigKeys.OFFSET_SNAPSHOT))) {

Review Comment:
   **[Missing `earliest` in validator]** `OFFSET_EARLIEST` is still missing 
from the validation whitelist. The comment in `DataSourceConfigKeys.java:30` 
says `// initial,earliest,latest,snapshot,...` but the validator only allows 
`initial`, `latest`, and `snapshot`. This means `offset=earliest` would be 
rejected at job creation time.
   
   This is a pre-existing issue (not introduced by this PR), but since this PR 
already touches this exact block and adds `snapshot`, it would be a good 
opportunity to also add `OFFSET_EARLIEST`:
   ```java
   || value.equalsIgnoreCase(DataSourceConfigKeys.OFFSET_EARLIEST)
   ```
   
   Also, the `equalsIgnoreCase` change (from `equals`) is a minor behavioral 
expansion — previously `INITIAL` or `Initial` would be rejected, now they're 
accepted. This is arguably correct but worth noting.



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -116,7 +116,8 @@ public Offset getNextOffset(StreamingJobProperties 
jobProps, Map<String, String>
             nextOffset.setSplits(snapshotSplits);
             return nextOffset;
         } else if (currentOffset != null && currentOffset.snapshotSplit()) {
-            // snapshot to binlog
+            // initial mode: snapshot to binlog
+            // snapshot-only mode must be intercepted by hasReachedEnd() 
before reaching here

Review Comment:
   **[Comment-only guard for snapshot-only mode — consider adding defensive 
assertion]** The comment says "snapshot-only mode must be intercepted by 
hasReachedEnd() before reaching here" but there's no actual code guard. If a 
future code change introduces a path that calls `getNextOffset()` without first 
checking `hasReachedEnd()`, snapshot-only mode would silently create a 
`BinlogSplit`, which is semantically incorrect.
   
   Consider adding a check here:
   ```java
   } else if (currentOffset != null && currentOffset.snapshotSplit()) {
       if (isSnapshotOnlyMode()) {
           throw new IllegalStateException(
               "snapshot-only mode should not reach binlog phase; 
hasReachedEnd() was not checked");
       }
       // initial mode: snapshot to binlog
       ...
   ```
   Per project conventions: error means failure, never allow silent 
continuation on invariant violations.



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -376,10 +380,16 @@ public void replayIfNeed(StreamingInsertJob job) throws 
JobException {
                         if (!lastSnapshotSplits.isEmpty()) {
                             currentOffset.setSplits(lastSnapshotSplits);
                         } else {
-                            // when snapshot to binlog phase fe restarts
-                            BinlogSplit binlogSplit = new BinlogSplit();
-                            binlogSplit.setFinishedSplits(finishedSplits);
-                            
currentOffset.setSplits(Collections.singletonList(binlogSplit));
+                            if (!isSnapshotOnlyMode()) {
+                                // initial mode: rebuild binlog split for 
snapshot-to-binlog transition
+                                BinlogSplit binlogSplit = new BinlogSplit();
+                                binlogSplit.setFinishedSplits(finishedSplits);
+                                
currentOffset.setSplits(Collections.singletonList(binlogSplit));
+                            } else {
+                                // snapshot-only mode: all splits finished, 
restore from finishedSplits
+                                // so snapshotSplit() works correctly and 
hasReachedEnd() can detect completion
+                                currentOffset.setSplits(new 
ArrayList<>(finishedSplits));

Review Comment:
   **[Replay correctness for snapshot-only mode]** When all splits are finished 
in snapshot-only mode, the code sets:
   ```java
   currentOffset.setSplits(new ArrayList<>(finishedSplits));
   ```
   This makes `snapshotSplit()` return true (because the splits list contains 
`SnapshotSplit` instances), which is critical for `hasReachedEnd()` to work. 
However, this also means `getShowCurrentOffset()` (line 134) would serialize 
and display all finished snapshot splits as the "current offset", which could 
be a very large JSON string for tables with many chunks. This is a cosmetic 
concern, not a correctness issue, but worth being aware of.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -789,8 +789,9 @@ private MySqlSourceConfig generateMySqlConfig(Map<String, 
String> cdcConfig, Str
         // setting startMode
         String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
         if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) 
{

Review Comment:
   **[Behavioral change to existing `initial` mode]** This line was previously 
commented out (`// configFactory.startupOptions(StartupOptions.initial())`) and 
is now uncommented. This means the `initial` startup mode behavior changes for 
*all* existing MySQL streaming jobs, not just the new `snapshot` mode.
   
   With the Flink CDC library, `StartupOptions.initial()` explicitly sets 
`StartupMode.INITIAL` on the config. Previously, the default startup mode of 
`MySqlSourceConfigFactory` was used (which happens to be INITIAL anyway based 
on the Flink CDC source). So this is likely a no-op in practice, but it's worth 
confirming that the default was indeed INITIAL. If the default was something 
else (or if a future Flink CDC version changes the default), this uncommented 
line changes behavior for all existing `initial` mode jobs.
   
   Please confirm this is intentional and that the Flink CDC default for 
`MySqlSourceConfigFactory.startupOptions` is `StartupOptions.initial()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to