1996fanrui opened a new pull request, #27905:
URL: https://github.com/apache/flink/pull/27905

   ## What is the purpose of the change
   
   Add spill-to-disk infrastructure for channel state recovery during unaligned 
checkpoint recovery with filtering (FLINK-38544).
   
   When unaligned checkpoint recovery runs with state filtering (rescaling), 
the current implementation holds all recovered buffers in memory (Network 
Buffer Pool), which can cause memory pressure and deadlocks. This PR introduces 
a spill-to-disk mechanism that:
   
   1. **Decouples Source Buffers from Network Buffer Pool**: Uses 
heap-allocated buffers for reading S3 data (Source Buffers), avoiding 
competition with the Network Buffer Pool
   2. **Three-path routing for filtered buffers**:
      - **P1 (Memory)**: When no disk data exists, deliver filtered buffers 
directly
      - **P2 (Spill)**: When Network Buffer Pool is exhausted, spill to local 
disk
      - **P3 (Replay)**: When disk has data, spill current buffer and replay 
from disk (FIFO ordering)
   3. **Phase 2 drain**: After all S3 data is read, drain remaining disk data 
using blocking buffer requests (safe because all heap Source Buffers are 
released)
   
   ### Core Components
   
   - `SpillFileWriter`: Writes raw buffer bytes to spill files
   - `SpillFileReader`: Reads raw buffer bytes back from spill files  
   - `SpillingBufferManager`: Manages spill/replay lifecycle with FIFO 
ordering, 64MB file rotation, reference counting, and old attempt cleanup
   - `ChannelStateSerializer.wrapWithoutRecycle()`: Supports heap buffer 
lifecycle where caller manages recycling
   
   ### Integration Changes
   
   - `InputChannelRecoveredStateHandler`: Updated with heap buffer allocation, 
three-path routing, and Phase 2 drain
   - `SequentialChannelStateReaderImpl`: Passes spill directories and calls 
`drainDiskData()` after read phase
   - `RecoveredInputChannel`: Changed `requestBufferBlocking` to non-blocking 
`requestBuffer`
   
   ## Brief change log
   
   - Add `SpillFileWriter`, `SpillFileReader`, and `SpillingBufferManager` for 
spill-to-disk infrastructure
   - Add `ChannelStateSerializer.wrapWithoutRecycle()` for filtering buffer 
lifecycle
   - Update `InputChannelRecoveredStateHandler` with heap buffer allocation, 
three-path routing, and Phase 2 disk drain
   - Update `SequentialChannelStateReaderImpl` to pass spill dirs and trigger 
Phase 2 drain
   - Change `RecoveredInputChannel.requestBufferBlocking` to non-blocking 
`requestBuffer`
   
   ## Verifying this change
   
   This change is verified by new unit tests:
   
   - `SpillFileWriterReaderTest`: Covers spill file I/O, file rotation, large 
buffers
   - `SpillingBufferManagerTest`: Covers replay ordering, checkpoint iterator, 
concurrent ref counting, FIFO guarantee
   - `InputChannelRecoveredStateHandlerTest`: Covers heap buffer allocation, 
source buffer isolation, sequential channel processing, no-filtering 
compatibility
   
   ## Does this pull request potentially affect one of the following parts
   
   - Dependencies (does it add or upgrade a dependency): no
   - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
   - The serializers: no
   - The runtime per-record code paths (performance sensitive): no
   - Anything that affects deployment or recovery: **yes** (recovery with state 
filtering)
   - The S3 file system connector: no
   
   ## Documentation
   
   - Does this pull request introduce a new feature? no (internal improvement 
to existing recovery mechanism)
   - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to