Dmitriy Linevich created FLINK-35351:
Summary: Restore from unaligned checkpoint with custom partitioner
fails.
Key: FLINK-35351
URL: https://issues.apache.org/jira/browse/FLINK-35351
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Reporter: Dmitriy Linevich
Restore from unaligned checkpoint from job with custom partitioner (
exactly because of using SubtaskStateMapper.FULL), with a change in parallelism
of one vertex failed:
{code:java}
[db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN Sink: sink
(3/3)#0 (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0)
switched from RUNNING to FAILED with failure cause:
java.io.IOException: Can't get next record for channel
InputChannelInfo{gateIdx=0, inputChannelIdx=0}
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
~[classes/:?]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879)
~[classes/:?]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
~[classes/:?]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
[classes/:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753)
[classes/:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) [classes/:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.io.IOException: Corrupt stream, found tag: -1
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:222)
~[classes/:?]
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
~[classes/:?]
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
~[classes/:?]
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
~[classes/:?]
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
~[classes/:?]
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
~[classes/:?]
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
~[classes/:?]
at
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
~[classes/:?]
at
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
~[classes/:?]
at
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
~[classes/:?]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:103)
~[classes/:?]
... 10 more {code}
Example: Source[2] -> Sink[3] restore for Source[1] -> Sink[3]
This fail happens because outputs of source and inputs of sink can consists
only 1 part of stream record during checkpoint. After restore 2 parts of one
record can be sent to different inputs of the sink. Because of parallelism of
sink not changed, inputs of sink don't know about other channels, only about
yours.
I think for fix need rescale inputs for sink, if source outputs was rescaled .
For fix need to add
[here|[https://github.com/apache/flink/blob/4165bac27bda4457e5940a994d923242d4a271dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L424]:]
{code:java}
boolean noNeedRescale = stateAssignment.executionJobVertex
.getJobVertex()
.getInputs()
.stream()
.map(JobEdge::getDownstreamSubtaskStateMapper)
.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL))
&& stateAssignment.executionJobVertex
.getInputs()
.stream()
.map(IntermediateResult::getProducer)