jubins opened a new pull request, #4410:
URL: https://github.com/apache/flink-cdc/pull/4410

   ## What is the purpose of the change
   
   Fixes FLINK-39478 — when `scan.newly-added-table.enabled=true` and a job 
restarts mid-snapshot of a newly-added table, the affected table can silently 
stop emitting events for the rest of the job's lifetime. The root cause is that 
`MySqlSnapshotSplitAssigner.checkpointIdToFinish` — the field that gates the 
`NEWLY_ADDED_ASSIGNING → NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED` transition — 
is transient (`@Nullable`, not in `SnapshotPendingSplitsState`), and on restore 
must be re-derived over two more checkpoint cycles (one `snapshotState()` to 
set it, one `notifyCheckpointComplete()` to act on it). If a second restart 
lands inside that recovery window, the field resets and the state machine 
stalls. The table remains in the pipeline config and the job appears healthy, 
but produces zero data thereafter. Reported in production on two ~100M-row 
tables; the non-deterministic "survived first restart, died on second" pattern 
is consistent with the timing race described above.
   
   This change persists `checkpointIdToFinish` into the checkpointed state so 
the state machine can resume in one step after restore, and adds 
enumerator-side metrics (`assignerStatus`, aggregate snapshot progress 
counters) so operators can observe whether the assigner is making progress 
during long newly-added-table snapshots — the lack of which made the original 
incident extremely hard to diagnose. The companion fix for the multi-step 
`BinlogSplitUpdateRequest/Ack` handshake between states 3→4 (fix direction #2 
in the JIRA) is deferred to a follow-up because it touches the 
enumerator↔reader RPC protocol and has a larger blast radius — this PR 
addresses what we believe is the most likely root cause of the observed 
incidents, plus the observability gap.
   
   ## Brief change log
   - Added `@Nullable Long checkpointIdToFinish` to 
`SnapshotPendingSplitsState` (flink-connector-mysql-cdc), with a new 11-arg 
constructor. The pre-existing 10-arg constructor is preserved as a delegating 
overload that defaults the new field to `null`, so all existing call sites 
compile unchanged and no test fixture data needs to change.
   - `snapshotState()` in `MySqlSnapshotSplitAssigner` now sets 
`checkpointIdToFinish` **before** building the state object, so the value is 
captured in the same checkpoint instead of the next one — eliminating the 
2-checkpoint recovery window after a restart.
   - Bumped `PendingSplitsStateSerializer` from `VERSION = 5` to `VERSION = 6`. 
The serialize path writes a trailing `boolean present + long value`; the 
deserialize path reads the trailer only when `version >= 6`. v5 payloads 
continue to deserialize with `checkpointIdToFinish=null`, so no savepoint 
migration is required.
   - New `MySqlSourceEnumeratorMetrics` (flink-connector-mysql-cdc) exposes the 
following gauges on the enumerator metric group: `assignerStatus` (status 
code), `assignerStatusName` (status name), `numRemainingTables`, 
`numRemainingSnapshotSplits`, `numAssignedSnapshotSplits`, 
`numFinishedSnapshotSplits`, `numAlreadyProcessedTables`. Registered from 
`MySqlSourceEnumerator#start`, wrapped in a try/catch so metric-registration 
failures can never break enumerator startup.
   - Added five `default int get*Count()` accessors to `MySqlSplitAssigner` 
(`getRemainingSplitsCount`, `getRemainingTablesCount`, 
`getAssignedSplitsCount`, `getFinishedSplitsCount`, 
`getAlreadyProcessedTablesCount`). Default returns `0`, preserving 
source-compatibility for any external implementor. Overridden in 
`MySqlSnapshotSplitAssigner` and delegated in `MySqlHybridSplitAssigner`; 
`MySqlBinlogSplitAssigner` uses the no-op defaults (its phase has no snapshot 
work to report).
   - Extended `PendingSplitsStateSerializerTest` with a new round-trip case 
carrying a non-null `checkpointIdToFinish`, plus a v5-payload backward-compat 
test confirming the field reads back as `null` on the old code path.
   
   ## Verifying this change
   
   This change persists one additional field and registers new metric gauges; 
no existing data path or split-assignment logic is altered. Verification:
   
   - New parameterized test variant in `PendingSplitsStateSerializerTest` 
(`getTestSnapshotPendingSplitsStateWithCheckpointIdToFinish`) covers snapshot- 
and hybrid-state round-trips with a non-null `checkpointIdToFinish` (42L). All 
31 tests in the class pass.
   - New `testDeserializeV5MissingCheckpointIdToFinish` asserts that a payload 
deserialized under the previous version number reads 
`checkpointIdToFinish=null` and does not attempt to read the v6 trailer — 
exercises the version-gated read branch.
   - Existing `equals`/`hashCode` round-trip in 
`PendingSplitsStateSerializerTest` now also exercises the new field via state 
equality (the field is included in both methods).
   - `mvn test-compile` passes for the whole `flink-connector-mysql-cdc` 
module, validating that the new interface defaults don't break any existing 
implementor (including the test-only assigner subclasses).
   - `mvn spotless:check` passes on the changed module.
   
   Note: `MySqlSnapshotSplitAssignerTest` and `MySqlHybridSplitAssignerTest` 
were not runnable in the local environment because they spin up MySQL via 
testcontainers (Docker not available). They should pass unchanged in CI — none 
of their fixture data construction needed updating thanks to the preserved 
10-arg constructor overload.
   
   A deterministic IT case that injects a restart between `snapshotState()` 
setting `checkpointIdToFinish` and `notifyCheckpointComplete()` acting on it — 
i.e., a regression test that fails on `master` and passes on this branch — is 
the natural next step but is outside the scope of this PR. Happy to add it as a 
follow-up if reviewers prefer; it requires some thought about how to 
deterministically schedule the restart at the vulnerable window without flaking.
   
   ## Does this pull request potentially affect one of the following parts
   
   - **Dependencies (does it add or upgrade a dependency):** no
   - **The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:** the `MySqlSplitAssigner` interface is `@Internal`, not 
`@PublicEvolving` — the new `default` methods are nonetheless source- and 
binary-compatible for any external implementor (defaults return `0`). 
`SnapshotPendingSplitsState` is also internal to the connector. No 
`@PublicEvolving` surface is touched.
   - **The serializers:** yes — `PendingSplitsStateSerializer` is bumped from 
version 5 to version 6. The new payload appends a `boolean + (optional) long`. 
The deserializer accepts versions 1–6; v5 payloads deserialize unchanged (with 
`checkpointIdToFinish=null`), so existing savepoints/checkpoints restore 
without migration.
   - **The runtime per-record code paths (performance sensitive):** no — 
`checkpointIdToFinish` is read/written only at checkpoint and restore. Metric 
gauges are lazily evaluated by Flink's reporter; their suppliers are `O(1)` 
(size lookups + a field read).
   - **Anything that affects deployment or recovery (JobManager, Checkpointing, 
Kubernetes/Yarn, ZooKeeper):** yes — the checkpointed enumerator state schema 
changes (additive, backward-compatible). Existing checkpoints/savepoints 
restore cleanly via the v5 read path.
   - **The S3 file system connector:** no
   
   ## Documentation
   
   - **Does this pull request introduce a new feature?** It introduces new 
enumerator metrics (`assignerStatus`, plus aggregate snapshot progress gauges) 
on the MySQL CDC source. The state-persistence change is a bug fix rather than 
a feature.
   - **If yes, how is the feature documented?** Inline Javadoc on 
`MySqlSourceEnumeratorMetrics` and on each metric-name constant documents the 
contract. The metric names follow the same convention as the existing 
`SourceEnumeratorMetrics` in `flink-cdc-base`. No standalone docs page change 
is required for this PR; if maintainers prefer, the MySQL connector's metrics 
table in the docs can be extended in a follow-up.
   
   ## Was generative AI tooling used to co-author this PR?
   - [x] Yes — Claude was used as a pair-programming assistant for discussing 
the approach, locating the fragile state-machine windows in the code, and 
drafting the implementation. All code was reviewed, understood, and verified by 
the author.
   
   Generated-by: Claude Opus 4.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to