1996fanrui opened a new pull request, #27905:
URL: https://github.com/apache/flink/pull/27905
## What is the purpose of the change
Add spill-to-disk infrastructure for channel state recovery during unaligned
checkpoint recovery with filtering (FLINK-38544).
When unaligned checkpoint recovery runs with state filtering (rescaling),
the current implementation holds all recovered buffers in memory (Network
Buffer Pool), which can cause memory pressure and deadlocks. This PR introduces
a spill-to-disk mechanism that:
1. **Decouples Source Buffers from Network Buffer Pool**: Uses
heap-allocated buffers for reading S3 data (Source Buffers), avoiding
competition with the Network Buffer Pool
2. **Three-path routing for filtered buffers**:
- **P1 (Memory)**: When no disk data exists, deliver filtered buffers
directly
- **P2 (Spill)**: When Network Buffer Pool is exhausted, spill to local
disk
- **P3 (Replay)**: When disk has data, spill current buffer and replay
from disk (FIFO ordering)
3. **Phase 2 drain**: After all S3 data is read, drain remaining disk data
using blocking buffer requests (safe because all heap Source Buffers are
released)
### Core Components
- `SpillFileWriter`: Writes raw buffer bytes to spill files
- `SpillFileReader`: Reads raw buffer bytes back from spill files
- `SpillingBufferManager`: Manages spill/replay lifecycle with FIFO
ordering, 64MB file rotation, reference counting, and old attempt cleanup
- `ChannelStateSerializer.wrapWithoutRecycle()`: Supports heap buffer
lifecycle where caller manages recycling
### Integration Changes
- `InputChannelRecoveredStateHandler`: Updated with heap buffer allocation,
three-path routing, and Phase 2 drain
- `SequentialChannelStateReaderImpl`: Passes spill directories and calls
`drainDiskData()` after read phase
- `RecoveredInputChannel`: Changed `requestBufferBlocking` to non-blocking
`requestBuffer`
## Brief change log
- Add `SpillFileWriter`, `SpillFileReader`, and `SpillingBufferManager` for
spill-to-disk infrastructure
- Add `ChannelStateSerializer.wrapWithoutRecycle()` for filtering buffer
lifecycle
- Update `InputChannelRecoveredStateHandler` with heap buffer allocation,
three-path routing, and Phase 2 disk drain
- Update `SequentialChannelStateReaderImpl` to pass spill dirs and trigger
Phase 2 drain
- Change `RecoveredInputChannel.requestBufferBlocking` to non-blocking
`requestBuffer`
## Verifying this change
This change is verified by new unit tests:
- `SpillFileWriterReaderTest`: Covers spill file I/O, file rotation, large
buffers
- `SpillingBufferManagerTest`: Covers replay ordering, checkpoint iterator,
concurrent ref counting, FIFO guarantee
- `InputChannelRecoveredStateHandlerTest`: Covers heap buffer allocation,
source buffer isolation, sequential channel processing, no-filtering
compatibility
## Does this pull request potentially affect one of the following parts
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: **yes** (recovery with state
filtering)
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no (internal improvement
to existing recovery mechanism)
- If yes, how is the feature documented? not applicable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]