jubins opened a new pull request, #28318:
URL: https://github.com/apache/flink/pull/28318

   ## What is the purpose of the change
   
   Fixes **FLINK-39837** — Dynamic Kafka sources (and other dynamic sources) 
are vulnerable to metadata service instability during distributed system 
operations like rollouts or restarts. When metadata configs oscillate between 
versions (e.g., `n → n-1 → n`), the source may detect a partition/cluster 
removal at version `n-1` and immediately purge it from the checkpoint. When 
version `n` reappears, the split is treated as brand new and consumption resets 
to `EARLIEST`, causing duplicate processing or data loss.
   
   This PR introduces **configurable tombstone-based retention** at the 
framework level (`SplitAssignmentTracker`) to make dynamic sources resilient to 
temporary metadata inconsistencies. When a split is removed, it can be retained 
in checkpoint state for a configurable duration (e.g., 24 hours). If the split 
reappears within the retention window, its progress is restored instead of 
resetting to the beginning.
   
   ## Brief change log
   
   - **Added `REMOVED_SPLITS_RETENTION_MS` configuration option** in 
`SourceReaderOptions` (default: `0` for backward compatibility, recommended: 
`86400000` for 24-hour retention in dynamic sources)
   - **Extended `SplitAssignmentTracker`** with tombstone tracking:
     - New inner class `RemovedSplitInfo<SplitT>` stores removed splits with 
removal timestamp and last assigned subtask
     - `markSplitRemoved(splitId, split, subtaskId)` — records split removal as 
tombstone (if retention enabled)
     - `tryResurrectSplit(splitId)` — attempts to restore split from tombstone 
if within retention window
     - `cleanupExpiredTombstones()` — automatically removes expired entries on 
checkpoint completion
   - **Updated checkpoint serialization** in `SourceCoordinatorSerdeUtils`:
     - Bumped version to `VERSION_2` to include tombstone state
     - Added `serializeTombstones()` and `deserializeTombstones()` methods
     - Maintains full backward compatibility with `VERSION_0` and `VERSION_1` 
checkpoints
   - **Extended `SplitAssignmentTracker` snapshot/restore logic**:
     - `snapshotState()` now persists both assignments and tombstones
     - `restoreState()` gracefully handles old checkpoints without tombstones 
(backward compatibility)
   - **Added constructor overload to `SourceCoordinatorContext`** accepting 
`removedSplitsRetentionMs` parameter (default constructor preserves existing 
behavior)
   - **Comprehensive JavaDoc** with Kafka connector usage example showing how 
to integrate `markSplitRemoved()` and `tryResurrectSplit()` in partition 
discovery logic
   
   ## Verifying this change
   
   This change is covered by **8 new unit tests** in 
`SplitAssignmentTrackerTest`:
   
   - `testMarkSplitRemovedWithRetention()` — verifies tombstone creation when 
retention is enabled
   - `testMarkSplitRemovedWithoutRetention()` — confirms default behavior (no 
tombstones) is unchanged
   - `testTryResurrectSplitWithinRetention()` — validates split resurrection 
within retention window
   - `testTryResurrectSplitExpired()` — verifies expired tombstones return 
`null` and are cleaned up
   - `testTryResurrectNonExistentSplit()` — confirms graceful handling of 
non-existent splits
   - `testCleanupExpiredTombstones()` — validates automatic cleanup on 
checkpoint completion
   - `testSnapshotAndRestoreWithTombstones()` — verifies tombstones survive 
checkpoint/restore cycles
   - `testBackwardCompatibilityWithOldCheckpoints()` — confirms new tracker can 
restore old-format checkpoints without tombstones
   
   Existing tests in `SplitAssignmentTrackerTest` continue to pass, confirming 
backward compatibility.
   
   ## Does this pull request potentially affect one of the following parts
   
   - **Dependencies** (does it add or upgrade a dependency): **no**
   - **The public API**, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no** — `SplitAssignmentTracker`, 
`SourceCoordinatorSerdeUtils`, and `SourceCoordinatorContext` are all 
`@Internal`. The new `SourceReaderOptions.REMOVED_SPLITS_RETENTION_MS` config 
is `@PublicEvolving` (standard for config options).
   - **The serializers**: **yes** — `SourceCoordinatorSerdeUtils` version 
bumped to `VERSION_2` with full backward compatibility
   - **The runtime per-record code paths** (performance sensitive): **no** — 
tombstone operations occur only during split discovery/checkpointing, not 
per-record processing
   - **Anything that affects deployment or recovery** (JobManager, 
Checkpointing, Kubernetes/Yarn, ZooKeeper): **minimal** — checkpoint state size 
may increase slightly when retention is enabled and splits are removed, but 
tombstones are bounded by retention duration and cleanup automatically
   - **The S3 file system connector**: **no**
   
   ## Documentation
   
   - **Does this pull request introduce a new feature?** **yes** — 
framework-level tombstone retention for dynamic sources
   - **If yes, how is the feature documented?** 
     - Inline JavaDoc on `SplitAssignmentTracker` with comprehensive usage 
example for Kafka connector developers
     - Config option documentation in 
`SourceReaderOptions.REMOVED_SPLITS_RETENTION_MS`
     - *(Follow-up PR will add user-facing docs once Kafka connector 
integration is complete)*
   
   ## Was generative AI tooling used to co-author this PR?
   
   - [x] Yes — Claude Code was used as a pair-programming assistant for 
discussing the architecture, reviewing implementation patterns, and structuring 
the tombstone retention logic. All code was written, understood, and verified 
by the author.
   
   **Generated-by:** Claude Opus 4.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to