Vinay Sagar Gonabavi created FLINK-39478:
--------------------------------------------

             Summary: Newly-added tables become ghost tables after job restart 
during initial snapshot phase
                 Key: FLINK-39478
                 URL: https://issues.apache.org/jira/browse/FLINK-39478
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.5.0
         Environment: * Flink CDC version: 3.4.0 (but also see on the latest)
 * {{scan.newly-added-table.enabled: true}}
 * Checkpoint interval: 5 minutes, exactly-once, RocksDB backend
 * Parallelism: 4
 * Restarts triggered by both external savepoint-cancel and pod 
replacement/checkpoint-restore
            Reporter: Vinay Sagar Gonabavi


*Summary*

When {{scan.newly-added-table.enabled=true}} and a job restarts 
(savepoint-cancel or checkpoint-restore) while a newly-added table is 
mid-snapshot, the table can silently stop emitting events. The table remains in 
the pipeline config and the job runs healthily for all other tables, but the 
affected table produces zero data indefinitely

*Production Evidence*

Two tables independently exhibited this behavior:
||Table||Rows||Snapshot duration||Paimon snapshots before death||Death 
trigger||Data after restart||
|{{table_1}}|100M+|multi-day|96 over 2 days|savepoint-cancel restart (Feb 
27)|zero|
|{{table_2}}|100M+|multi-day|201 over 1.5 days|survived 1st restart, died on 
2nd|zero|
 * Job continues running healthily for all other tables (confirmed by Paimon 
counts and operator metrics)
 * ~60K records still flowing are all from working tables
 * Both tables confirmed mid-snapshot at time of restart via S3 data

Notably, {{table_2}} survived one restart and died on the second. This 
non-determinism points to a timing/race condition in the recovery path rather 
than a guaranteed failure. The outcome depends on exactly which state the 
assigner was in at the checkpoint boundary.

*Root Cause Analysis*

The Flink CDC MySQL source uses a finite state machine ({{{}AssignerStatus{}}}) 
to manage newly-added table snapshots. The state progression is:
 
{code:java}
 INITIAL_ASSIGNING_FINISHED (1)
  → NEWLY_ADDED_ASSIGNING (2)          // snapshot splits being assigned
  → NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED (3)  // all splits done, awaiting 
binlog update
  → NEWLY_ADDED_ASSIGNING_FINISHED (4) // binlog handshake complete{code}
We were unable to determine the exact failing code path due to limited 
observability into the assigner state at the time of the restarts. However, 
code analysis reveals that a restart during states 2 or 3 exposes multiple 
fragile recovery windows, any of which could permanently stall the state 
machine:
*1. Transient {{checkpointIdToFinish}} field (MySqlSnapshotSplitAssigner.java)*

This field gates the {{NEWLY_ADDED_ASSIGNING → 
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED}} transition but is {*}not 
checkpointed{*}. After restore it's {{{}null{}}}, requiring 2 additional 
checkpoint cycles to re-derive (one {{snapshotState()}} to set it, one 
{{notifyCheckpointComplete()}} to act on it). If the job restarts again within 
those 2 cycles, the window resets. This could explain why 
{{business_attribute_generic}} survived one restart but not the second, the 
first restart may have landed on a clean boundary, while the second hit before 
the 2-cycle recovery completed.

*2. Multi-step binlog update handshake between states 3→4* 
({*}MySqlSourceEnumerator.java{*})

The transition from state 3 to 4 requires: enumerator sends 
{{BinlogSplitUpdateRequestEvent}} → reader suspends binlog → reader requests 
latest finished splits number → enumerator responds → reader updates binlog 
split → reader sends {{BinlogSplitUpdateAckEvent}} → 
{{{}onBinlogSplitUpdated(){}}}. A restart during this handshake restores state 
3, and {{MySqlHybridSplitAssigner.getNext()}} returns {{Optional.empty()}} for 
all split requests ([line 
111-113|vscode-webview://0di1hviani5ulovs2gk0jmuprp1vj2avkgjgbhsvuatriqsqoe60/flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java#L111-L113])
 until {{requestBinlogSplitUpdateIfNeed()}} re-triggers the handshake via 
{{{}syncWithReaders(){}}}.

*3. {{captureNewlyAddedTables()}} guard skips re-discovery* 
({*}MySqlSnapshotSplitAssigner.java{*})

{{if (... && AssignerStatus.isAssigningFinished(assignerStatus)) {}}
This only passes for states 1 and 4. If the restored state is 2 or 3, the 
entire newly-added table re-discovery is skipped. The code assumes in-progress 
work will resume from checkpoint state, but as described above, the recovery 
path depends on transient fields and multi-step coordination that may not 
complete before the next restart.

*4. Existing test acknowledges this fragility* 
({*}NewlyAddedTableITCase.java{*}):

{{// sleep 1s to wait for the assign status to INITIAL_ASSIGNING_FINISHED.
// Otherwise, the restart job won't read newly added tables, and this test will 
be stuck.
Thread.sleep(1000L);}}
The combination of transient fields, multi-step handshakes, and guarded 
re-discovery creates multiple windows where a restart can permanently stall the 
state machine. The non-deterministic behavior across our two affected tables is 
consistent with this, the outcome depends on the exact assigner state at the 
checkpoint boundary, which varies by timing.

*Impact*
 * Large tables (100M+ rows) are disproportionately affected because their 
multi-day snapshot maximizes the window where a restart hits the vulnerable 
state
 * Silent failure - no errors logged, job appears healthy, only detectable by 
monitoring per-table output
 * Unrecoverable without dropping the table from the pipeline and re-adding it 
(full re-snapshot)

 

*Suggested Fix Directions*
 # Persist {{checkpointIdToFinish}} in {{SnapshotPendingSplitsState}} so the 
state machine can advance immediately after restore without waiting for 2 
additional checkpoint cycles
 # Make the binlog update handshake (state 3→4) idempotent and automatically 
re-triggerable on restore, rather than relying on {{syncWithReaders()}} to 
eventually re-trigger it
 # Add a safety mechanism: if {{assignerStatus}} is in state 2 or 3 and no 
progress (split assignments or finished reports) occurs within N checkpoint 
cycles, log a warning and consider resetting the table's snapshot
 # Expose {{assignerStatus}} and per-table snapshot progress as JMX/Flink 
metrics - the lack of observability into the assigner state made this extremely 
difficult to diagnose and made it impossible to determine the exact failing 
code path

*Workaround*

When adding large tables with {{{}scan.newly-added-table.enabled{}}}, avoid 
restarting the job until the initial snapshot completes. If a restart occurs 
mid-snapshot and the table becomes a ghost, remove the table from the pipeline 
config, restart, then re-add it to force a fresh snapshot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to