[ 
https://issues.apache.org/jira/browse/FLINK-40016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18093057#comment-18093057
 ] 

Rui Fan commented on FLINK-40016:
---------------------------------

h2. LocalInputChannel might double-persists recovered buffers into channel 
state for one checkpoint

LocalInputChannel might snapshots recovered buffers twice into the UC channel 
state, corrupting it — on restore, deserialization reads the duplicated bytes
  and fails. (Introduced by FLINK-39018; RemoteInputChannel is unaffected.)
h2. Root cause:

When a unaligned checkpoint starts(received the first UC barrier or AC timeouts 
), checkpointStarted() already persists the recovered buffers once via 
startPersisting() (state  → BARRIER_PENDING). 

After that we still need to snapshot buffers before barrier from upstream side 
for each channels.

getBufferAndAvailability() is shared by recovered buffers (toBeConsumedBuffers) 
and upstream data (subpartitionView), and calls  maybePersist() on every 
buffer. Before the barrier arrives, the task polls those same recovered 
buffers, and maybePersist() — still in BARRIER_PENDING — writes
  each one a second time. maybePersist is only meant for upstream in-flight 
data; recovered buffers are already fully captured by startPersisting.
h2. Fix:

persist only on the upstream path (as RemoteInputChannel does)

> UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails with 
> "Corrupt stream, found tag" (in-flight stream desync) on rescale recovery
> ------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-40016
>                 URL: https://issues.apache.org/jira/browse/FLINK-40016
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 2.3.0, 2.4.0
>            Reporter: Martijn Visser
>            Assignee: Rui Fan
>            Priority: Major
>             Fix For: 2.4.0
>
>         Attachments: 
> UnalignedCheckpointRescaleITCase_upscale_pipeline_20_to_21_FAILED.log
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=76170&view=results]
>  (leg: test_cron_azure tests)
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:180)
>       at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpointTestBase.java:217)
>       at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint(UnalignedCheckpointRescaleITCase.java:683)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, 
> backoffTimeMS=100)
> Caused by: java.io.IOException: Can't get next record for channel 
> InputChannelInfo{gateIdx=0, inputChannelIdx=5}
> Caused by: java.io.IOException: Corrupt stream, found tag: 8
> {code}
> {{org.apache.flink.test.checkpointing.UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint}}
>  (parameter {{{}[19]{}}}) fails on rescale recovery: the restored in-flight 
> byte stream is misaligned, so {{StreamElementSerializer}} reads a data byte 
> where an element tag is expected. Valid tags are 0-6; "tag 8" is out of 
> range, confirming a stream desync rather than a format mismatch.
> Suspected to be in the recovered in-flight-buffer / filtering-recovery path 
> that landed in master 2026-02 .. 2026-04 (FLINK-39018 buffer migration from 
> RecoveredInputChannel to physical channels; FLINK-38930 filtering record 
> before processing without spilling / heap-buffer fallback during recovery; 
> FLINK-39519 reusable heap segment for pre-filter source buffers). The failing 
> build's head commit (4902753429, FLINK-35562) is a flink-table-planner 
> test-only change and cannot have introduced this, so the defect is latent in 
> master.
> Related: FLINK-22197 (same test/method but a {{{}NoSuchFileException{}}}, 
> different cause), FLINK-38643 (hang), FLINK-35351 / FLINK-39162 (closed 
> UC-corruption, custom-partitioner specific).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to