Vinay Sagar Gonabavi created FLINK-39478:
--------------------------------------------
Summary: Newly-added tables become ghost tables after job restart
during initial snapshot phase
Key: FLINK-39478
URL: https://issues.apache.org/jira/browse/FLINK-39478
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.5.0
Environment: * Flink CDC version: 3.4.0 (but also see on the latest)
* {{scan.newly-added-table.enabled: true}}
* Checkpoint interval: 5 minutes, exactly-once, RocksDB backend
* Parallelism: 4
* Restarts triggered by both external savepoint-cancel and pod
replacement/checkpoint-restore
Reporter: Vinay Sagar Gonabavi
*Summary*
When {{scan.newly-added-table.enabled=true}} and a job restarts
(savepoint-cancel or checkpoint-restore) while a newly-added table is
mid-snapshot, the table can silently stop emitting events. The table remains in
the pipeline config and the job runs healthily for all other tables, but the
affected table produces zero data indefinitely
*Production Evidence*
Two tables independently exhibited this behavior:
||Table||Rows||Snapshot duration||Paimon snapshots before death||Death
trigger||Data after restart||
|{{table_1}}|100M+|multi-day|96 over 2 days|savepoint-cancel restart (Feb
27)|zero|
|{{table_2}}|100M+|multi-day|201 over 1.5 days|survived 1st restart, died on
2nd|zero|
* Job continues running healthily for all other tables (confirmed by Paimon
counts and operator metrics)
* ~60K records still flowing are all from working tables
* Both tables confirmed mid-snapshot at time of restart via S3 data
Notably, {{table_2}} survived one restart and died on the second. This
non-determinism points to a timing/race condition in the recovery path rather
than a guaranteed failure. The outcome depends on exactly which state the
assigner was in at the checkpoint boundary.
*Root Cause Analysis*
The Flink CDC MySQL source uses a finite state machine ({{{}AssignerStatus{}}})
to manage newly-added table snapshots. The state progression is:
{code:java}
INITIAL_ASSIGNING_FINISHED (1)
→ NEWLY_ADDED_ASSIGNING (2) // snapshot splits being assigned
→ NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED (3) // all splits done, awaiting
binlog update
→ NEWLY_ADDED_ASSIGNING_FINISHED (4) // binlog handshake complete{code}
We were unable to determine the exact failing code path due to limited
observability into the assigner state at the time of the restarts. However,
code analysis reveals that a restart during states 2 or 3 exposes multiple
fragile recovery windows, any of which could permanently stall the state
machine:
*1. Transient {{checkpointIdToFinish}} field (MySqlSnapshotSplitAssigner.java)*
This field gates the {{NEWLY_ADDED_ASSIGNING →
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED}} transition but is {*}not
checkpointed{*}. After restore it's {{{}null{}}}, requiring 2 additional
checkpoint cycles to re-derive (one {{snapshotState()}} to set it, one
{{notifyCheckpointComplete()}} to act on it). If the job restarts again within
those 2 cycles, the window resets. This could explain why
{{business_attribute_generic}} survived one restart but not the second, the
first restart may have landed on a clean boundary, while the second hit before
the 2-cycle recovery completed.
*2. Multi-step binlog update handshake between states 3→4*
({*}MySqlSourceEnumerator.java{*})
The transition from state 3 to 4 requires: enumerator sends
{{BinlogSplitUpdateRequestEvent}} → reader suspends binlog → reader requests
latest finished splits number → enumerator responds → reader updates binlog
split → reader sends {{BinlogSplitUpdateAckEvent}} →
{{{}onBinlogSplitUpdated(){}}}. A restart during this handshake restores state
3, and {{MySqlHybridSplitAssigner.getNext()}} returns {{Optional.empty()}} for
all split requests ([line
111-113|vscode-webview://0di1hviani5ulovs2gk0jmuprp1vj2avkgjgbhsvuatriqsqoe60/flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java#L111-L113])
until {{requestBinlogSplitUpdateIfNeed()}} re-triggers the handshake via
{{{}syncWithReaders(){}}}.
*3. {{captureNewlyAddedTables()}} guard skips re-discovery*
({*}MySqlSnapshotSplitAssigner.java{*})
{{if (... && AssignerStatus.isAssigningFinished(assignerStatus)) {}}
This only passes for states 1 and 4. If the restored state is 2 or 3, the
entire newly-added table re-discovery is skipped. The code assumes in-progress
work will resume from checkpoint state, but as described above, the recovery
path depends on transient fields and multi-step coordination that may not
complete before the next restart.
*4. Existing test acknowledges this fragility*
({*}NewlyAddedTableITCase.java{*}):
{{// sleep 1s to wait for the assign status to INITIAL_ASSIGNING_FINISHED.
// Otherwise, the restart job won't read newly added tables, and this test will
be stuck.
Thread.sleep(1000L);}}
The combination of transient fields, multi-step handshakes, and guarded
re-discovery creates multiple windows where a restart can permanently stall the
state machine. The non-deterministic behavior across our two affected tables is
consistent with this, the outcome depends on the exact assigner state at the
checkpoint boundary, which varies by timing.
*Impact*
* Large tables (100M+ rows) are disproportionately affected because their
multi-day snapshot maximizes the window where a restart hits the vulnerable
state
* Silent failure - no errors logged, job appears healthy, only detectable by
monitoring per-table output
* Unrecoverable without dropping the table from the pipeline and re-adding it
(full re-snapshot)
*Suggested Fix Directions*
# Persist {{checkpointIdToFinish}} in {{SnapshotPendingSplitsState}} so the
state machine can advance immediately after restore without waiting for 2
additional checkpoint cycles
# Make the binlog update handshake (state 3→4) idempotent and automatically
re-triggerable on restore, rather than relying on {{syncWithReaders()}} to
eventually re-trigger it
# Add a safety mechanism: if {{assignerStatus}} is in state 2 or 3 and no
progress (split assignments or finished reports) occurs within N checkpoint
cycles, log a warning and consider resetting the table's snapshot
# Expose {{assignerStatus}} and per-table snapshot progress as JMX/Flink
metrics - the lack of observability into the assigner state made this extremely
difficult to diagnose and made it impossible to determine the exact failing
code path
*Workaround*
When adding large tables with {{{}scan.newly-added-table.enabled{}}}, avoid
restarting the job until the initial snapshot completes. If a restart occurs
mid-snapshot and the table becomes a ghost, remove the table from the pipeline
config, restart, then re-add it to force a fresh snapshot.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)