[
https://issues.apache.org/jira/browse/FLINK-39645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18081776#comment-18081776
]
袁焊忠 commented on FLINK-39645:
-----------------------------
I can try to take this one. The reproduction path is the recovery window where
HybridSourceSplit instances are buffered in restoredSplits while currentReader
is still null, and snapshotState currently snapshots an empty delegate state in
that window.
I'll start with a focused regression test that snapshots after
addSplits()/start() but before the SwitchSourceEvent, then keep the code change
limited to preserving the restored splits in the checkpoint state.
> HybridSourceReader.snapshotState() loses recovered splits when currentReader
> is null
> ------------------------------------------------------------------------------------
>
> Key: FLINK-39645
> URL: https://issues.apache.org/jira/browse/FLINK-39645
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HybridSource
> Reporter: Chen Zhang
> Priority: Minor
>
> h3. Summary
> HybridSourceReader.snapshotState() can return an empty split list during
> recovery, permanently losing splits stored in the restoredSplits field. This
> leads to silent data loss under repeated failover scenarios.
> h3. Reproduction Scenario
> 1. A Flink job using HybridSource takes a checkpoint successfully.
> 2. The job fails and restores from the checkpoint.
> 3. During recovery, addSplits() is called. Since currentSourceIndex == -1,
> splits are buffered into restoredSplits (not forwarded to any reader).
> 4. start() sends SourceReaderFinishedEvent to the coordinator, requesting a
> SwitchSourceEvent to activate the appropriate source reader.
> 5. Before the SwitchSourceEvent arrives, a checkpoint is triggered.
> 6. snapshotState() finds currentReader == null and returns
> Collections.emptyList(), ignoring restoredSplits entirely.
> 7. If the job fails again and restores from this new checkpoint, the buffered
> splits are gone forever.
> h3. Root Cause
> In HybridSourceReader.java line 109-114:
> {code:java}
> public List<HybridSourceSplit> snapshotState(long checkpointId) {
> List<? extends SourceSplit> state =
> currentReader != null
> ? currentReader.snapshotState(checkpointId)
> : Collections.emptyList();
> return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
> switchedSources);
> }
> {code}
> When currentReader is null (which is the normal state between recovery and
> source switch), the method snapshots an empty list. The restoredSplits field
> – which holds splits recovered from the previous checkpoint but not yet
> assigned to a reader – is completely excluded from the snapshot.
> h3. Impact
> - Silent data loss: splits are dropped without any error or warning
> - Most likely to surface in unstable environments with frequent restarts,
> where the window between recovery and source switching is hit by consecutive
> failures
> - Affects all HybridSource users
> h3. Suggested Fix
> Throw exceptions when trying to snapshot state when current reader is null.
> {code:java}
> @Override
> public List<HybridSourceSplit> snapshotState(long checkpointId) {
> if (currentReader != null) {
> List<? extends SourceSplit> state =
> currentReader.snapshotState(checkpointId);
> return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
> switchedSources);
> } else {
> throw new IllegalStateException("currentReader can't be null when
> snapshot");
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)