[
https://issues.apache.org/jira/browse/FLINK-38267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Fan updated FLINK-38267:
----------------------------
Component/s: Runtime / Checkpointing
> Job cannot be recovered from unaligned checkpoint after rescaling when one
> task has multiple exchanges
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38267
> URL: https://issues.apache.org/jira/browse/FLINK-38267
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 2.0.0, 1.20.2, 2.1.0, 2.2.0
> Reporter: Rui Fan
> Assignee: Rui Fan
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.0.1, 1.20.3, 2.2.0, 2.1.1
>
> Attachments: image-2025-08-19-13-58-15-029.png
>
>
> h1. 1. Phenomenon:
> Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and
> the exception is:
> {code:java}
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
> partitioner.
> Did you change the partitioner to forward or rescale?
> It may also help to add an explicit shuffle().{code}
>
> UC is the abbreviation of unaligned checkpoint in this ticket.
> h1. 2. Reason
> h2. 2.1 What types of jobs trigger this bug?
> When one upstream task has multiple output exchanges, which including UC
> SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED
> exchanges(likes Forward or rescale).
> Or when one downstream task has multiple input exchanges, which including UC
> SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED
> exchanges(likes Forward or rescale).
> h2. 2.2 Why does this bug happen?
> When job is rescaled and recovered from unaligned checkpoint, flink need to
> redistribute inflight buffers (input buffers on downstream side and output
> buffers on upstream side).
> The ForwardPartitioner and RescalePartitioner exchanges do not support
> unaligned checkpoint, so they are not expected to perform redistribution
> logic. From code implementation:
> * For input buffers redistribution[1], if current task has no input buffer
> state, and upstream task has no output buffer state, the code will return
> directly without any redistribution.
> * For output buffers redistribution[2], if current task has no output buffer
> state, and downstream task has no input buffer state, the code will return
> directly without any redistribution.
> But it does not work when upstream tasks has multiple output exchanges.
> Following is an DAG example, there are 3 tasks and 2 exchanges (Hash and
> forward) .
> * The Hash exchange supports unaligned checkpoint
> * The Hash exchange does not support unaligned checkpoint
>
> !image-2025-08-19-13-58-15-029.png|width=786,height=421!
> When Job is recovered from UC(unaligned checkpoint) after rescaling, the *Map
> after forward* will check its input buffer state and Source’s output buffer
> state. Source task has output buffer state for this case, but these output
> buffer state is from Hash exchange instead of Forward exchange.
> It caused the redistribution will be called for {*}Map after forward{*}, it
> is unexpected.
> Of course, from the perspective of upstream task(Source task), it has 2
> output exchanges, the forward exchange should not call rescale logic even if
> hash exchange has state.
> h2. 2.3 Reproduce
> The following job can reproduce this bug easily.
>
> {code:java}
> import org.apache.commons.math3.random.RandomDataGenerator;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.typeinfo.Types;
> import
> org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.connector.datagen.source.DataGeneratorSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> /**
> * It could reproduce this issue:
> * Caused by: java.lang.UnsupportedOperationException: Cannot rescale the
> given pointwise partitioner.
> * Did you change the partitioner to forward or rescale?
> * It may also help to add an explicit shuffle().
> */
> public class UnalignedCheckpointBugDemo {
> private static final Logger LOG =
> LoggerFactory.getLogger(UnalignedCheckpointBugDemo.class);
> public static void main(String[] args) throws Exception {
> Configuration conf = new Configuration();
> conf.setString("rest.port", "12348");
> conf.setString("execution.checkpointing.unaligned.enabled", "true");
> conf.setString("execution.checkpointing.interval", "10s");
> conf.setString("execution.checkpointing.min-pause", "8s");
> conf.setString("jobmanager.scheduler", "adaptive");
> conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
> env.disableOperatorChaining();
> env.setParallelism(5);
> SingleOutputStreamOperator<String> stream1 = env.fromSource(
> new DataGeneratorSource<>(
> value -> new
> RandomDataGenerator().nextHexString(300),
> Long.MAX_VALUE,
> RateLimiterStrategy.perSecond(100000),
> Types.STRING),
> WatermarkStrategy.noWatermarks(),
> "Source Task");
> stream1
> .keyBy(new KeySelectorFunction())
> .map(x -> {
> Thread.sleep(50);
> return x;
> }).name("Map after hash");
> stream1.map(x -> {
> Thread.sleep(5);
> return x;
> }).name("Map after forward");
> env.execute(UnalignedCheckpointBugDemo.class.getSimpleName());
> }
> private static class KeySelectorFunction implements KeySelector<String,
> Integer> {
> @Override
> public Integer getKey(String value) throws Exception {
> return 0;
> }
> }
> }
> {code}
>
> h1. 3. Solution
> The implemented solution was to make the state redistribution logic more
> granular by checking for in-flight data on a *per-exchange* basis instead of
> a per-task basis.
> # *Precise State Tracking:* The {{TaskStateAssignment}} class was refactored
> to no longer use a simple boolean flag. It now precisely tracks which
> specific input gates and result partitions contain in-flight data.
> # *Per-Channel/Partition Checks:* The core redistribution methods,
> {{reDistributeInputChannelStates}} and
> {{{}reDistributeResultSubpartitionStates{}}}, were modified. Their internal
> logic now iterates through each input gate or output partition and uses new
> helper methods ({{{}hasInFlightDataForInputGate{}}} and
> {{{}hasInFlightDataForResultPartition{}}}) to check if that _specific
> channel_ has state.
> # *Conditional Logic:* The state redistribution logic is now wrapped in a
> conditional block. It is only invoked for a channel if the per-exchange check
> passes. This ensures that stateless exchanges (like {{forward}} or
> {{{}rescale{}}}) are correctly skipped, avoiding the exception.
> This approach fixes the bug by applying the redistribution logic only where
> it is actually needed, allowing jobs with mixed partitioner types to rescale
> from an unaligned checkpoint successfully.
>
> [1]
> [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L413]
> [2]
> [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L364]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)