[ 
https://issues.apache.org/jira/browse/FLINK-38357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinglong Wang updated FLINK-38357:
----------------------------------
    Description: 
h2. Problem

StreamTask: {{map -> (filter -> Sink: sink-r, map-q)}}, the 
tailOperator/outputOperator is `Sink: sink-r`.

```

// TaskStateAssignment
outputOperatorID = operatorIDs.get(0).getGeneratedOperatorID();
 
 
// OperatorChain
this.tailOperatorWrapper = allOpWrappers.get(0);
```

While in reality, `map-q` is connected to the downstream Task.

If we cleverly change the parallelism, it can result in this situation:
 * StreamTask will change to `map -> (filter, map-q)`,
 * and `Sink: sink-r` will become a downstream StreamTask: `Sink: sink-r`.

In this situation, ResultSubpartitionRecoveredStateHandler#getSubpartition 
reports ArrayIndexOutOfBoundException: 0, since Sink does not have writers.

```
java.lang.ArrayIndexOutOfBoundsException: 0
at 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getSubpartition(RecoveredChannelStateHandler.java:217)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.lambda$calculateMapping$1(RecoveredChannelStateHandler.java:237)
 ~[flink-dist-1.16.1.jar:1.16.1]
at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_312]
at 
java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
 ~[?:1.8.0_312]
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) 
~[?:1.8.0_312]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
~[?:1.8.0_312]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
~[?:1.8.0_312]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
~[?:1.8.0_312]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
~[?:1.8.0_312]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) 
~[?:1.8.0_312]
at 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.calculateMapping(RecoveredChannelStateHandler.java:238)
 ~[flink-dist-1.16.1.jar:1.16.1]
at java.util.HashMap.computeIfAbsent(HashMap.java:1128) ~[?:1.8.0_312]
at 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getMappedChannels(RecoveredChannelStateHandler.java:227)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getBuffer(RecoveredChannelStateHandler.java:182)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getBuffer(RecoveredChannelStateHandler.java:157)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateChunkReader.readChunk(SequentialChannelStateReaderImpl.java:200)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl.readSequentially(SequentialChannelStateReaderImpl.java:109)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl.read(SequentialChannelStateReaderImpl.java:95)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl.readOutputData(SequentialChannelStateReaderImpl.java:81)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:735)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:713)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:679)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
 ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:906) 
[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) 
[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
[flink-dist-1.16.1.jar:1.16.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
```

 
h2. Analysis

When a Task contains multiple output operators, the current implementation only 
treats one of them as the tailOperator/outputOperator. If the 
tailOperator/outputOperator is set to Sink, this leads to an error when 
restoring the channel state for the ResultPartition: indexing into the 
ResultPartitionWriter fails with an "index 0 out of bounds" exception.

 
h2. Extension

If there's multiple output operators, each connected to a different downstream 
tasks, the current Flink implementation's assumption of a single 
tailOperator/outputOperator breaks down. For example, in a job with three 
outputs:

!image-2025-09-15-21-34-19-517.png!

The mapping between ResultPartition and output operator needs to be more 
detailed to correctly retain the relationship of channel state restoration.

  was:
h2. Problem

StreamTask: {quote}map -> (filter -> Sink: sink-r, map-q){quote}, the 
tailOperator/outputOperator is `Sink: sink-r`.

```

// TaskStateAssignment
outputOperatorID = operatorIDs.get(0).getGeneratedOperatorID();
 
 
// OperatorChain
this.tailOperatorWrapper = allOpWrappers.get(0);
```

While in reality, `map-q` is connected to the downstream Task.

If we cleverly change the parallelism, it can result in this situation:
 * StreamTask will change to `map -> (filter, map-q)`,
 * and `Sink: sink-r` will become a downstream StreamTask: `Sink: sink-r`.

In this situation, ResultSubpartitionRecoveredStateHandler#getSubpartition 
reports ArrayIndexOutOfBoundException: 0, since Sink does not have writers.

```
java.lang.ArrayIndexOutOfBoundsException: 0
at 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getSubpartition(RecoveredChannelStateHandler.java:217)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.lambda$calculateMapping$1(RecoveredChannelStateHandler.java:237)
 ~[flink-dist-1.16.1.jar:1.16.1]
at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_312]
at 
java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
 ~[?:1.8.0_312]
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) 
~[?:1.8.0_312]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
~[?:1.8.0_312]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
~[?:1.8.0_312]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
~[?:1.8.0_312]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
~[?:1.8.0_312]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) 
~[?:1.8.0_312]
at 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.calculateMapping(RecoveredChannelStateHandler.java:238)
 ~[flink-dist-1.16.1.jar:1.16.1]
at java.util.HashMap.computeIfAbsent(HashMap.java:1128) ~[?:1.8.0_312]
at 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getMappedChannels(RecoveredChannelStateHandler.java:227)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getBuffer(RecoveredChannelStateHandler.java:182)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getBuffer(RecoveredChannelStateHandler.java:157)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateChunkReader.readChunk(SequentialChannelStateReaderImpl.java:200)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl.readSequentially(SequentialChannelStateReaderImpl.java:109)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl.read(SequentialChannelStateReaderImpl.java:95)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl.readOutputData(SequentialChannelStateReaderImpl.java:81)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:735)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:713)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:679)
 ~[flink-dist-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
 ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:906) 
[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) 
[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
[flink-dist-1.16.1.jar:1.16.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
```

 
h2. Analysis

When a Task contains multiple output operators, the current implementation only 
treats one of them as the tailOperator/outputOperator. If the 
tailOperator/outputOperator is set to Sink, this leads to an error when 
restoring the channel state for the ResultPartition: indexing into the 
ResultPartitionWriter fails with an "index 0 out of bounds" exception.

 
h2. Extension

If there's multiple output operators, each connected to a different downstream 
tasks, the current Flink implementation's assumption of a single 
tailOperator/outputOperator breaks down. For example, in a job with three 
outputs:

!image-2025-09-15-21-34-19-517.png!

The mapping between ResultPartition and output operator needs to be more 
detailed to correctly retain the relationship of channel state restoration.


> ResultSubpartitionRecoveredStateHandler.getSubpartition 
> ArrayIndexOutOfBoundException 0
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-38357
>                 URL: https://issues.apache.org/jira/browse/FLINK-38357
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.16.1, 2.1.0
>            Reporter: Xinglong Wang
>            Priority: Major
>         Attachments: image-2025-09-15-21-34-19-517.png
>
>
> h2. Problem
> StreamTask: {{map -> (filter -> Sink: sink-r, map-q)}}, the 
> tailOperator/outputOperator is `Sink: sink-r`.
> ```
> // TaskStateAssignment
> outputOperatorID = operatorIDs.get(0).getGeneratedOperatorID();
>  
>  
> // OperatorChain
> this.tailOperatorWrapper = allOpWrappers.get(0);
> ```
> While in reality, `map-q` is connected to the downstream Task.
> If we cleverly change the parallelism, it can result in this situation:
>  * StreamTask will change to `map -> (filter, map-q)`,
>  * and `Sink: sink-r` will become a downstream StreamTask: `Sink: sink-r`.
> In this situation, ResultSubpartitionRecoveredStateHandler#getSubpartition 
> reports ArrayIndexOutOfBoundException: 0, since Sink does not have writers.
> ```
> java.lang.ArrayIndexOutOfBoundsException: 0
> at 
> org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getSubpartition(RecoveredChannelStateHandler.java:217)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at 
> org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.lambda$calculateMapping$1(RecoveredChannelStateHandler.java:237)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) 
> ~[?:1.8.0_312]
> at 
> java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
>  ~[?:1.8.0_312]
> at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) 
> ~[?:1.8.0_312]
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
> ~[?:1.8.0_312]
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> ~[?:1.8.0_312]
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
> ~[?:1.8.0_312]
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
> ~[?:1.8.0_312]
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) 
> ~[?:1.8.0_312]
> at 
> org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.calculateMapping(RecoveredChannelStateHandler.java:238)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at java.util.HashMap.computeIfAbsent(HashMap.java:1128) ~[?:1.8.0_312]
> at 
> org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getMappedChannels(RecoveredChannelStateHandler.java:227)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at 
> org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getBuffer(RecoveredChannelStateHandler.java:182)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at 
> org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getBuffer(RecoveredChannelStateHandler.java:157)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateChunkReader.readChunk(SequentialChannelStateReaderImpl.java:200)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at 
> org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl.readSequentially(SequentialChannelStateReaderImpl.java:109)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at 
> org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl.read(SequentialChannelStateReaderImpl.java:95)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at 
> org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl.readOutputData(SequentialChannelStateReaderImpl.java:81)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:735)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:713)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:679)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
>  ~[flink-dist-1.16.1.jar:1.16.1]
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:906) 
> [flink-dist-1.16.1.jar:1.16.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) 
> [flink-dist-1.16.1.jar:1.16.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> [flink-dist-1.16.1.jar:1.16.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
> ```
>  
> h2. Analysis
> When a Task contains multiple output operators, the current implementation 
> only treats one of them as the tailOperator/outputOperator. If the 
> tailOperator/outputOperator is set to Sink, this leads to an error when 
> restoring the channel state for the ResultPartition: indexing into the 
> ResultPartitionWriter fails with an "index 0 out of bounds" exception.
>  
> h2. Extension
> If there's multiple output operators, each connected to a different 
> downstream tasks, the current Flink implementation's assumption of a single 
> tailOperator/outputOperator breaks down. For example, in a job with three 
> outputs:
> !image-2025-09-15-21-34-19-517.png!
> The mapping between ResultPartition and output operator needs to be more 
> detailed to correctly retain the relationship of channel state restoration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to