This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c787fcf3a60a484b159799eb4f326f248f9a6dbd Author: Rui Fan <[email protected]> AuthorDate: Mon Nov 3 22:01:10 2025 +0100 [hotfix][checkpoint] Refactor output buffers distribution logic via ResultSubpartitionDistributor --- .../channel/RecoveredChannelStateHandler.java | 55 +++----- .../channel/ResultSubpartitionDistributor.java | 73 +++++++++++ .../channel/ResultSubpartitionDistributorTest.java | 139 +++++++++++++++++++++ 3 files changed, 228 insertions(+), 39 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java index 3177bd5f9f4..7a06f52b145 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java @@ -158,19 +158,27 @@ class ResultSubpartitionRecoveredStateHandler private final ResultPartitionWriter[] writers; private final boolean notifyAndBlockOnCompletion; - - private final InflightDataRescalingDescriptor channelMapping; - - private final Map<ResultSubpartitionInfo, List<ResultSubpartitionInfo>> rescaledChannels = - new HashMap<>(); - private final Map<Integer, RescaleMappings> oldToNewMappings = new HashMap<>(); + private final ResultSubpartitionDistributor resultSubpartitionDistributor; ResultSubpartitionRecoveredStateHandler( ResultPartitionWriter[] writers, boolean notifyAndBlockOnCompletion, InflightDataRescalingDescriptor channelMapping) { this.writers = writers; - this.channelMapping = channelMapping; + this.resultSubpartitionDistributor = + new ResultSubpartitionDistributor(channelMapping) { + /** + * Override the getSubpartitionInfo to perform type checking on the + * ResultPartitionWriter. + */ + @Override + ResultSubpartitionInfo getSubpartitionInfo( + int partitionIndex, int subPartitionIdx) { + CheckpointedResultPartition writer = + getCheckpointedResultPartition(partitionIndex); + return writer.getCheckpointedSubpartitionInfo(subPartitionIdx); + } + }; this.notifyAndBlockOnCompletion = notifyAndBlockOnCompletion; } @@ -197,7 +205,7 @@ class ResultSubpartitionRecoveredStateHandler return; } final List<ResultSubpartitionInfo> mappedSubpartitions = - getMappedSubpartitions(subpartitionInfo); + resultSubpartitionDistributor.getMappedSubpartitions(subpartitionInfo); CheckpointedResultPartition checkpointedResultPartition = getCheckpointedResultPartition(subpartitionInfo.getPartitionIdx()); for (final ResultSubpartitionInfo mappedSubpartition : mappedSubpartitions) { @@ -215,11 +223,6 @@ class ResultSubpartitionRecoveredStateHandler } } - private ResultSubpartitionInfo getSubpartitionInfo(int partitionIndex, int subPartitionIdx) { - CheckpointedResultPartition writer = getCheckpointedResultPartition(partitionIndex); - return writer.getCheckpointedSubpartitionInfo(subPartitionIdx); - } - private CheckpointedResultPartition getCheckpointedResultPartition(int partitionIndex) { ResultPartitionWriter writer = writers[partitionIndex]; if (!(writer instanceof CheckpointedResultPartition)) { @@ -229,32 +232,6 @@ class ResultSubpartitionRecoveredStateHandler return (CheckpointedResultPartition) writer; } - private List<ResultSubpartitionInfo> getMappedSubpartitions( - ResultSubpartitionInfo subpartitionInfo) { - return rescaledChannels.computeIfAbsent(subpartitionInfo, this::calculateMapping); - } - - private List<ResultSubpartitionInfo> calculateMapping(ResultSubpartitionInfo info) { - final RescaleMappings oldToNewMapping = - oldToNewMappings.computeIfAbsent( - info.getPartitionIdx(), - idx -> channelMapping.getChannelMapping(idx).invert()); - final List<ResultSubpartitionInfo> subpartitions = - Arrays.stream(oldToNewMapping.getMappedIndexes(info.getSubPartitionIdx())) - .mapToObj( - newIndexes -> - getSubpartitionInfo(info.getPartitionIdx(), newIndexes)) - .collect(Collectors.toList()); - if (subpartitions.isEmpty()) { - throw new IllegalStateException( - "Recovered a buffer from old " - + info - + " that has no mapping in " - + channelMapping.getChannelMapping(info.getPartitionIdx())); - } - return subpartitions; - } - @Override public void close() throws IOException { for (ResultPartitionWriter writer : writers) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionDistributor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionDistributor.java new file mode 100644 index 00000000000..5c7910d9c50 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionDistributor.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaleMappings; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** The distributor for channel state of result subpartition. */ +public class ResultSubpartitionDistributor { + + private final InflightDataRescalingDescriptor channelMapping; + + private final Map<ResultSubpartitionInfo, List<ResultSubpartitionInfo>> rescaledChannels = + new HashMap<>(); + private final Map<Integer, RescaleMappings> oldToNewMappings = new HashMap<>(); + + public ResultSubpartitionDistributor(InflightDataRescalingDescriptor channelMapping) { + this.channelMapping = channelMapping; + } + + public List<ResultSubpartitionInfo> getMappedSubpartitions( + ResultSubpartitionInfo subpartitionInfo) { + return rescaledChannels.computeIfAbsent(subpartitionInfo, this::calculateMapping); + } + + private List<ResultSubpartitionInfo> calculateMapping(ResultSubpartitionInfo info) { + final RescaleMappings oldToNewMapping = + oldToNewMappings.computeIfAbsent( + info.getPartitionIdx(), + idx -> channelMapping.getChannelMapping(idx).invert()); + final List<ResultSubpartitionInfo> subpartitions = + Arrays.stream(oldToNewMapping.getMappedIndexes(info.getSubPartitionIdx())) + .mapToObj( + newIndexes -> + getSubpartitionInfo(info.getPartitionIdx(), newIndexes)) + .collect(Collectors.toList()); + if (subpartitions.isEmpty()) { + throw new IllegalStateException( + "Recovered a buffer from old " + + info + + " that has no mapping in " + + channelMapping.getChannelMapping(info.getPartitionIdx())); + } + return subpartitions; + } + + ResultSubpartitionInfo getSubpartitionInfo(int partitionIndex, int subPartitionIdx) { + return new ResultSubpartitionInfo(partitionIndex, subPartitionIdx); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionDistributorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionDistributorTest.java new file mode 100644 index 00000000000..d58ce647029 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionDistributorTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaleMappings; + +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.List; + +import static org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.mappings; +import static org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.to; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link ResultSubpartitionDistributor}. */ +class ResultSubpartitionDistributorTest { + + private static final int DEFAULT_PARTITION_INDEX = 0; + private static final int DEFAULT_SUBPARTITION_INDEX = 0; + + private InflightDataRescalingDescriptor createIdentityMapping() { + return new InflightDataRescalingDescriptor( + new InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor + [] { + new InflightDataRescalingDescriptor + .InflightDataGateOrPartitionRescalingDescriptor( + new int[] {1}, + RescaleMappings.identity(1, 1), + new HashSet<>(), + InflightDataRescalingDescriptor + .InflightDataGateOrPartitionRescalingDescriptor.MappingType + .IDENTITY) + }); + } + + private InflightDataRescalingDescriptor createRescalingMapping(RescaleMappings mappings) { + return new InflightDataRescalingDescriptor( + new InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor + [] { + new InflightDataRescalingDescriptor + .InflightDataGateOrPartitionRescalingDescriptor( + new int[] {2}, + mappings, + new HashSet<>(), + InflightDataRescalingDescriptor + .InflightDataGateOrPartitionRescalingDescriptor.MappingType + .RESCALING) + }); + } + + @Test + void testGetMappedSubpartitionsIdentityMapping() { + InflightDataRescalingDescriptor identityMapping = createIdentityMapping(); + ResultSubpartitionDistributor distributor = + new ResultSubpartitionDistributor(identityMapping); + ResultSubpartitionInfo inputInfo = + new ResultSubpartitionInfo(DEFAULT_PARTITION_INDEX, DEFAULT_SUBPARTITION_INDEX); + + List<ResultSubpartitionInfo> mappedSubpartitions = + distributor.getMappedSubpartitions(inputInfo); + + // Identity mapping should preserve original indices + assertThat(mappedSubpartitions).hasSize(1); + assertThat(mappedSubpartitions.get(0).getPartitionIdx()).isEqualTo(DEFAULT_PARTITION_INDEX); + assertThat(mappedSubpartitions.get(0).getSubPartitionIdx()) + .isEqualTo(DEFAULT_SUBPARTITION_INDEX); + } + + @Test + void testGetMappedSubpartitionsRescaling() { + // Test rescaling scenario where one input maps to multiple outputs + RescaleMappings rescaleMappings = mappings(to(0, 1)); + InflightDataRescalingDescriptor rescalingMapping = createRescalingMapping(rescaleMappings); + ResultSubpartitionDistributor distributor = + new ResultSubpartitionDistributor(rescalingMapping); + ResultSubpartitionInfo inputInfo = + new ResultSubpartitionInfo(DEFAULT_PARTITION_INDEX, DEFAULT_SUBPARTITION_INDEX); + + List<ResultSubpartitionInfo> mappedSubpartitions = + distributor.getMappedSubpartitions(inputInfo); + + // Rescaling preserves partition index but may change subpartition mapping + assertThat(mappedSubpartitions).isNotEmpty(); + assertThat(mappedSubpartitions) + .allMatch(info -> info.getPartitionIdx() == DEFAULT_PARTITION_INDEX); + } + + @Test + void testMappingCacheConsistency() { + // Verify caching behavior to ensure performance optimization + InflightDataRescalingDescriptor identityMapping = createIdentityMapping(); + ResultSubpartitionDistributor distributor = + new ResultSubpartitionDistributor(identityMapping); + ResultSubpartitionInfo inputInfo = + new ResultSubpartitionInfo(DEFAULT_PARTITION_INDEX, DEFAULT_SUBPARTITION_INDEX); + + List<ResultSubpartitionInfo> firstCall = distributor.getMappedSubpartitions(inputInfo); + List<ResultSubpartitionInfo> secondCall = distributor.getMappedSubpartitions(inputInfo); + + // Cache should return same instance + assertThat(firstCall).isEqualTo(secondCall).isSameAs(secondCall); + } + + @Test + void testInvalidMappingThrowsException() { + // Test error handling when mapping configuration is inconsistent + RescaleMappings mappingsWithNoTarget = mappings(); + InflightDataRescalingDescriptor invalidMapping = + createRescalingMapping(mappingsWithNoTarget); + ResultSubpartitionDistributor distributor = + new ResultSubpartitionDistributor(invalidMapping); + ResultSubpartitionInfo inputInfo = + new ResultSubpartitionInfo(DEFAULT_PARTITION_INDEX, DEFAULT_SUBPARTITION_INDEX); + + assertThatThrownBy(() -> distributor.getMappedSubpartitions(inputInfo)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Recovered a buffer from old") + .hasMessageContaining("that has no mapping in"); + } +}
