[ 
https://issues.apache.org/jira/browse/FLINK-39645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Zhang updated FLINK-39645:
-------------------------------
    Description: 
h3. Summary

HybridSourceReader.snapshotState() can return an empty split list during 
recovery, permanently losing splits stored in the restoredSplits field. This 
leads to silent data loss under repeated failover scenarios.
h3. Reproduction Scenario

1. A Flink job using HybridSource takes a checkpoint successfully.
2. The job fails and restores from the checkpoint.
3. During recovery, addSplits() is called. Since currentSourceIndex == -1, 
splits are buffered into restoredSplits (not forwarded to any reader).
4. start() sends SourceReaderFinishedEvent to the coordinator, requesting a 
SwitchSourceEvent to activate the appropriate source reader.
5. Before the SwitchSourceEvent arrives, a checkpoint is triggered.
6. snapshotState() finds currentReader == null and returns 
Collections.emptyList(), ignoring restoredSplits entirely.
7. If the job fails again and restores from this new checkpoint, the buffered 
splits are gone forever.
h3. Root Cause

In HybridSourceReader.java line 109-114:
{code:java}
public List<HybridSourceSplit> snapshotState(long checkpointId) {
    List<? extends SourceSplit> state =
            currentReader != null
                    ? currentReader.snapshotState(checkpointId)
                    : Collections.emptyList();
    return HybridSourceSplit.wrapSplits(state, currentSourceIndex, 
switchedSources);
}
{code}
When currentReader is null (which is the normal state between recovery and 
source switch), the method snapshots an empty list. The restoredSplits field – 
which holds splits recovered from the previous checkpoint but not yet assigned 
to a reader – is completely excluded from the snapshot.
h3. Impact
 - Silent data loss: splits are dropped without any error or warning
 - Most likely to surface in unstable environments with frequent restarts, 
where the window between recovery and source switching is hit by consecutive 
failures
 - Affects all HybridSource users

h3. Suggested Fix

Throw exceptions when trying to snapshot state when current reader is null.
{code:java}
@Override
public List<HybridSourceSplit> snapshotState(long checkpointId) {
    if (currentReader != null) {
        List<? extends SourceSplit> state = 
currentReader.snapshotState(checkpointId);
        return HybridSourceSplit.wrapSplits(state, currentSourceIndex, 
switchedSources);
    } else {
        throw new InvalidStateException("currentReader can't be null when 
snapshot");
    }
}
{code}
Additionally, a unit test should be added that verifies snapshotState() 
preserves restoredSplits when no reader has been activated yet.

  was:

h3. Summary

HybridSourceReader.snapshotState() can return an empty split list during 
recovery, permanently losing splits stored in the restoredSplits field. This 
leads to silent data loss under repeated failover scenarios.

h3. Reproduction Scenario

1. A Flink job using HybridSource takes a checkpoint successfully.
2. The job fails and restores from the checkpoint.
3. During recovery, addSplits() is called. Since currentSourceIndex == -1, 
splits are buffered into restoredSplits (not forwarded to any reader).
4. start() sends SourceReaderFinishedEvent to the coordinator, requesting a 
SwitchSourceEvent to activate the appropriate source reader.
5. Before the SwitchSourceEvent arrives, a checkpoint is triggered.
6. snapshotState() finds currentReader == null and returns 
Collections.emptyList(), ignoring restoredSplits entirely.
7. If the job fails again and restores from this new checkpoint, the buffered 
splits are gone forever.

h3. Root Cause

In HybridSourceReader.java line 109-114:

{code:java}
public List<HybridSourceSplit> snapshotState(long checkpointId) {
    List<? extends SourceSplit> state =
            currentReader != null
                    ? currentReader.snapshotState(checkpointId)
                    : Collections.emptyList();
    return HybridSourceSplit.wrapSplits(state, currentSourceIndex, 
switchedSources);
}
{code}

When currentReader is null (which is the normal state between recovery and 
source switch), the method snapshots an empty list. The restoredSplits field -- 
which holds splits recovered from the previous checkpoint but not yet assigned 
to a reader -- is completely excluded from the snapshot.

h3. Impact

- Silent data loss: splits are dropped without any error or warning
- Most likely to surface in unstable environments with frequent restarts, where 
the window between recovery and source switching is hit by consecutive failures
- Affects all HybridSource users

h3. Suggested Fix

Include restoredSplits in the snapshot when currentReader is null:

{code:java}
@Override
public List<HybridSourceSplit> snapshotState(long checkpointId) {
    if (currentReader != null) {
        List<? extends SourceSplit> state = 
currentReader.snapshotState(checkpointId);
        return HybridSourceSplit.wrapSplits(state, currentSourceIndex, 
switchedSources);
    } else {
        return new ArrayList<>(restoredSplits);
    }
}
{code}

Additionally, a unit test should be added that verifies snapshotState() 
preserves restoredSplits when no reader has been activated yet.



> HybridSourceReader.snapshotState() loses recovered splits when currentReader 
> is null
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-39645
>                 URL: https://issues.apache.org/jira/browse/FLINK-39645
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HybridSource
>            Reporter: Chen Zhang
>            Priority: Minor
>
> h3. Summary
> HybridSourceReader.snapshotState() can return an empty split list during 
> recovery, permanently losing splits stored in the restoredSplits field. This 
> leads to silent data loss under repeated failover scenarios.
> h3. Reproduction Scenario
> 1. A Flink job using HybridSource takes a checkpoint successfully.
> 2. The job fails and restores from the checkpoint.
> 3. During recovery, addSplits() is called. Since currentSourceIndex == -1, 
> splits are buffered into restoredSplits (not forwarded to any reader).
> 4. start() sends SourceReaderFinishedEvent to the coordinator, requesting a 
> SwitchSourceEvent to activate the appropriate source reader.
> 5. Before the SwitchSourceEvent arrives, a checkpoint is triggered.
> 6. snapshotState() finds currentReader == null and returns 
> Collections.emptyList(), ignoring restoredSplits entirely.
> 7. If the job fails again and restores from this new checkpoint, the buffered 
> splits are gone forever.
> h3. Root Cause
> In HybridSourceReader.java line 109-114:
> {code:java}
> public List<HybridSourceSplit> snapshotState(long checkpointId) {
>     List<? extends SourceSplit> state =
>             currentReader != null
>                     ? currentReader.snapshotState(checkpointId)
>                     : Collections.emptyList();
>     return HybridSourceSplit.wrapSplits(state, currentSourceIndex, 
> switchedSources);
> }
> {code}
> When currentReader is null (which is the normal state between recovery and 
> source switch), the method snapshots an empty list. The restoredSplits field 
> – which holds splits recovered from the previous checkpoint but not yet 
> assigned to a reader – is completely excluded from the snapshot.
> h3. Impact
>  - Silent data loss: splits are dropped without any error or warning
>  - Most likely to surface in unstable environments with frequent restarts, 
> where the window between recovery and source switching is hit by consecutive 
> failures
>  - Affects all HybridSource users
> h3. Suggested Fix
> Throw exceptions when trying to snapshot state when current reader is null.
> {code:java}
> @Override
> public List<HybridSourceSplit> snapshotState(long checkpointId) {
>     if (currentReader != null) {
>         List<? extends SourceSplit> state = 
> currentReader.snapshotState(checkpointId);
>         return HybridSourceSplit.wrapSplits(state, currentSourceIndex, 
> switchedSources);
>     } else {
>         throw new InvalidStateException("currentReader can't be null when 
> snapshot");
>     }
> }
> {code}
> Additionally, a unit test should be added that verifies snapshotState() 
> preserves restoredSplits when no reader has been activated yet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to