github-actions[bot] commented on code in PR #61389:
URL: https://github.com/apache/doris/pull/61389#discussion_r2944732897
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java:
##########
@@ -82,8 +82,9 @@ private static boolean isValidValue(String key, String value)
{
}
if (key.equals(DataSourceConfigKeys.OFFSET)
- && !(value.equals(DataSourceConfigKeys.OFFSET_INITIAL)
- || value.equals(DataSourceConfigKeys.OFFSET_LATEST))) {
+ &&
!(value.equalsIgnoreCase(DataSourceConfigKeys.OFFSET_INITIAL)
+ || value.equalsIgnoreCase(DataSourceConfigKeys.OFFSET_LATEST)
+ ||
value.equalsIgnoreCase(DataSourceConfigKeys.OFFSET_SNAPSHOT))) {
Review Comment:
**[Missing `earliest` in validator]** `OFFSET_EARLIEST` is still missing
from the validation whitelist. The comment in `DataSourceConfigKeys.java:30`
says `// initial,earliest,latest,snapshot,...` but the validator only allows
`initial`, `latest`, and `snapshot`. This means `offset=earliest` would be
rejected at job creation time.
This is a pre-existing issue (not introduced by this PR), but since this PR
already touches this exact block and adds `snapshot`, it would be a good
opportunity to also add `OFFSET_EARLIEST`:
```java
|| value.equalsIgnoreCase(DataSourceConfigKeys.OFFSET_EARLIEST)
```
Also, the `equalsIgnoreCase` change (from `equals`) is a minor behavioral
expansion — previously `INITIAL` or `Initial` would be rejected, now they're
accepted. This is arguably correct but worth noting.
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -116,7 +116,8 @@ public Offset getNextOffset(StreamingJobProperties
jobProps, Map<String, String>
nextOffset.setSplits(snapshotSplits);
return nextOffset;
} else if (currentOffset != null && currentOffset.snapshotSplit()) {
- // snapshot to binlog
+ // initial mode: snapshot to binlog
+ // snapshot-only mode must be intercepted by hasReachedEnd()
before reaching here
Review Comment:
**[Comment-only guard for snapshot-only mode — consider adding defensive
assertion]** The comment says "snapshot-only mode must be intercepted by
hasReachedEnd() before reaching here" but there's no actual code guard. If a
future code change introduces a path that calls `getNextOffset()` without first
checking `hasReachedEnd()`, snapshot-only mode would silently create a
`BinlogSplit`, which is semantically incorrect.
Consider adding a check here:
```java
} else if (currentOffset != null && currentOffset.snapshotSplit()) {
if (isSnapshotOnlyMode()) {
throw new IllegalStateException(
"snapshot-only mode should not reach binlog phase;
hasReachedEnd() was not checked");
}
// initial mode: snapshot to binlog
...
```
Per project conventions: error means failure, never allow silent
continuation on invariant violations.
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -376,10 +380,16 @@ public void replayIfNeed(StreamingInsertJob job) throws
JobException {
if (!lastSnapshotSplits.isEmpty()) {
currentOffset.setSplits(lastSnapshotSplits);
} else {
- // when snapshot to binlog phase fe restarts
- BinlogSplit binlogSplit = new BinlogSplit();
- binlogSplit.setFinishedSplits(finishedSplits);
-
currentOffset.setSplits(Collections.singletonList(binlogSplit));
+ if (!isSnapshotOnlyMode()) {
+ // initial mode: rebuild binlog split for
snapshot-to-binlog transition
+ BinlogSplit binlogSplit = new BinlogSplit();
+ binlogSplit.setFinishedSplits(finishedSplits);
+
currentOffset.setSplits(Collections.singletonList(binlogSplit));
+ } else {
+ // snapshot-only mode: all splits finished,
restore from finishedSplits
+ // so snapshotSplit() works correctly and
hasReachedEnd() can detect completion
+ currentOffset.setSplits(new
ArrayList<>(finishedSplits));
Review Comment:
**[Replay correctness for snapshot-only mode]** When all splits are finished
in snapshot-only mode, the code sets:
```java
currentOffset.setSplits(new ArrayList<>(finishedSplits));
```
This makes `snapshotSplit()` return true (because the splits list contains
`SnapshotSplit` instances), which is critical for `hasReachedEnd()` to work.
However, this also means `getShowCurrentOffset()` (line 134) would serialize
and display all finished snapshot splits as the "current offset", which could
be a very large JSON string for tables with many chunks. This is a cosmetic
concern, not a correctness issue, but worth being aware of.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -789,8 +789,9 @@ private MySqlSourceConfig generateMySqlConfig(Map<String,
String> cdcConfig, Str
// setting startMode
String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode))
{
Review Comment:
**[Behavioral change to existing `initial` mode]** This line was previously
commented out (`// configFactory.startupOptions(StartupOptions.initial())`) and
is now uncommented. This means the `initial` startup mode behavior changes for
*all* existing MySQL streaming jobs, not just the new `snapshot` mode.
With the Flink CDC library, `StartupOptions.initial()` explicitly sets
`StartupMode.INITIAL` on the config. Previously, the default startup mode of
`MySqlSourceConfigFactory` was used (which happens to be INITIAL anyway based
on the Flink CDC source). So this is likely a no-op in practice, but it's worth
confirming that the default was indeed INITIAL. If the default was something
else (or if a future Flink CDC version changes the default), this uncommented
line changes behavior for all existing `initial` mode jobs.
Please confirm this is intentional and that the Flink CDC default for
`MySqlSourceConfigFactory.startupOptions` is `StartupOptions.initial()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]