[ 
https://issues.apache.org/jira/browse/FLINK-39478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083162#comment-18083162
 ] 

Jubin Soni commented on FLINK-39478:
------------------------------------

I have created a PR that addresses fixes 1 and 2 suggested above: 
https://github.com/apache/flink/pull/28241

> Newly-added tables become ghost tables after job restart during initial 
> snapshot phase
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-39478
>                 URL: https://issues.apache.org/jira/browse/FLINK-39478
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>         Environment: * Flink CDC version: 3.4.0 (but also see on the latest)
>  * {{scan.newly-added-table.enabled: true}}
>  * Checkpoint interval: 5 minutes, exactly-once, RocksDB backend
>  * Parallelism: 4
>  * Restarts triggered by both external savepoint-cancel and pod 
> replacement/checkpoint-restore
>            Reporter: Vinay Sagar Gonabavi
>            Priority: Critical
>              Labels: pull-request-available
>
> *Summary*
> When {{scan.newly-added-table.enabled=true}} and a job restarts 
> (savepoint-cancel or checkpoint-restore) while a newly-added table is 
> mid-snapshot, the table can silently stop emitting events. The table remains 
> in the pipeline config and the job runs healthily for all other tables, but 
> the affected table produces zero data indefinitely
> *Production Evidence*
> Two tables independently exhibited this behavior:
> ||Table||Rows||Snapshot duration||Paimon snapshots before death||Death 
> trigger||Data after restart||
> |{{table_1}}|100M+|multi-day|96 over 2 days|savepoint-cancel restart (Feb 
> 27)|zero|
> |{{table_2}}|100M+|multi-day|201 over 1.5 days|survived 1st restart, died on 
> 2nd|zero|
>  * Job continues running healthily for all other tables (confirmed by Paimon 
> counts and operator metrics)
>  * ~60K records still flowing are all from working tables
>  * Both tables confirmed mid-snapshot at time of restart via S3 data
> Notably, {{table_2}} survived one restart and died on the second. This 
> non-determinism points to a timing/race condition in the recovery path rather 
> than a guaranteed failure. The outcome depends on exactly which state the 
> assigner was in at the checkpoint boundary.
> *Root Cause Analysis*
> The Flink CDC MySQL source uses a finite state machine 
> ({{{}AssignerStatus{}}}) to manage newly-added table snapshots. The state 
> progression is:
>  
> {code:java}
>  INITIAL_ASSIGNING_FINISHED (1)
>   → NEWLY_ADDED_ASSIGNING (2)          // snapshot splits being assigned
>   → NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED (3)  // all splits done, awaiting 
> binlog update
>   → NEWLY_ADDED_ASSIGNING_FINISHED (4) // binlog handshake complete{code}
> We were unable to determine the exact failing code path due to limited 
> observability into the assigner state at the time of the restarts. However, 
> code analysis reveals that a restart during states 2 or 3 exposes multiple 
> fragile recovery windows, any of which could permanently stall the state 
> machine:
> *1. Transient {{checkpointIdToFinish}} field 
> (MySqlSnapshotSplitAssigner.java)*
> This field gates the {{NEWLY_ADDED_ASSIGNING → 
> NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED}} transition but is {*}not 
> checkpointed{*}. After restore it's {{{}null{}}}, requiring 2 additional 
> checkpoint cycles to re-derive (one {{snapshotState()}} to set it, one 
> {{notifyCheckpointComplete()}} to act on it). If the job restarts again 
> within those 2 cycles, the window resets. This could explain why {{table_2}} 
> survived one restart but not the second, the first restart may have landed on 
> a clean boundary, while the second hit before the 2-cycle recovery completed.
> *2. Multi-step binlog update handshake between states 3→4* 
> ({*}MySqlSourceEnumerator.java{*})
> The transition from state 3 to 4 requires: enumerator sends 
> {{BinlogSplitUpdateRequestEvent}} → reader suspends binlog → reader requests 
> latest finished splits number → enumerator responds → reader updates binlog 
> split → reader sends {{BinlogSplitUpdateAckEvent}} → 
> {{{}onBinlogSplitUpdated(){}}}. A restart during this handshake restores 
> state 3, and {{MySqlHybridSplitAssigner.getNext()}} returns 
> {{Optional.empty()}} for all split requests until 
> {{requestBinlogSplitUpdateIfNeed()}} re-triggers the handshake via 
> {{{}syncWithReaders(){}}}.
> *3. {{captureNewlyAddedTables()}} guard skips re-discovery* 
> ({*}MySqlSnapshotSplitAssigner.java{*})
> {code:java}
> if (... && AssignerStatus.isAssigningFinished(assignerStatus)) { {code}
> This only passes for states 1 and 4. If the restored state is 2 or 3, the 
> entire newly-added table re-discovery is skipped. The code assumes 
> in-progress work will resume from checkpoint state, but as described above, 
> the recovery path depends on transient fields and multi-step coordination 
> that may not complete before the next restart.
> *4. Existing test acknowledges this fragility* 
> ({*}NewlyAddedTableITCase.java{*}):
> {code:java}
> // sleep 1s to wait for the assign status to INITIAL_ASSIGNING_FINISHED.
> // Otherwise, the restart job won't read newly added tables, and this test 
> will be stuck.
> Thread.sleep(1000L); {code}
> The combination of transient fields, multi-step handshakes, and guarded 
> re-discovery creates multiple windows where a restart can permanently stall 
> the state machine. The non-deterministic behavior across our two affected 
> tables is consistent with this, the outcome depends on the exact assigner 
> state at the checkpoint boundary, which varies by timing.
> *Impact*
>  * Large tables (100M+ rows) are disproportionately affected because their 
> multi-day snapshot maximizes the window where a restart hits the vulnerable 
> state
>  * Silent failure - no errors logged, job appears healthy, only detectable by 
> monitoring per-table output
>  * Unrecoverable without dropping the table from the pipeline and re-adding 
> it (full re-snapshot)
>  
> *Suggested Fix Directions*
>  # Persist {{checkpointIdToFinish}} in {{SnapshotPendingSplitsState}} so the 
> state machine can advance immediately after restore without waiting for 2 
> additional checkpoint cycles
>  # Make the binlog update handshake (state 3→4) idempotent and automatically 
> re-triggerable on restore, rather than relying on {{syncWithReaders()}} to 
> eventually re-trigger it
>  # Add a safety mechanism: if {{assignerStatus}} is in state 2 or 3 and no 
> progress (split assignments or finished reports) occurs within N checkpoint 
> cycles, log a warning and consider resetting the table's snapshot
>  # Expose {{assignerStatus}} and per-table snapshot progress as JMX/Flink 
> metrics - the lack of observability into the assigner state made this 
> extremely difficult to diagnose and made it impossible to determine the exact 
> failing code path
> *Workaround*
> When adding large tables with {{{}scan.newly-added-table.enabled{}}}, avoid 
> restarting the job until the initial snapshot completes. If a restart occurs 
> mid-snapshot and the table becomes a ghost, remove the table from the 
> pipeline config, restart, then re-add it to force a fresh snapshot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to