Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r168777749 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.BiFunction; + +/** + * This class is a wrapper over multiple alternative {@link OperatorSubtaskState} that are (partial) substitutes for + * each other and imposes a priority ordering over all alternatives for the different states which define an order in + * which the operator should attempt to restore the state from them. One OperatorSubtaskState is considered as the + * "ground truth" about which state should be represented. Alternatives may be complete or partial substitutes for + * the "ground truth" with a higher priority (if they had a lower alternative, they would not really be alternatives). + * Substitution is determined on a per-sub-state basis. + */ +public class PrioritizedOperatorSubtaskState { + + /** Singleton instance for an empty, non-restored operator state. */ + private static final PrioritizedOperatorSubtaskState EMPTY_NON_RESTORED_INSTANCE = + new PrioritizedOperatorSubtaskState(new OperatorSubtaskState(), Collections.emptyList(), false); + + /** List of prioritized snapshot alternatives for managed operator state. */ + private final List<StateObjectCollection<OperatorStateHandle>> prioritizedManagedOperatorState; + + /** List of prioritized snapshot alternatives for raw operator state. */ + private final List<StateObjectCollection<OperatorStateHandle>> prioritizedRawOperatorState; + + /** List of prioritized snapshot alternatives for managed keyed state. */ + private final List<StateObjectCollection<KeyedStateHandle>> prioritizedManagedKeyedState; + + /** List of prioritized snapshot alternatives for raw keyed state. */ + private final List<StateObjectCollection<KeyedStateHandle>> prioritizedRawKeyedState; + + /** Signal flag if this represents state for a restored operator. */ + private final boolean restored; + + public PrioritizedOperatorSubtaskState( + @Nonnull OperatorSubtaskState jobManagerState, + @Nonnull List<OperatorSubtaskState> alternativesByPriority) { + this(jobManagerState, alternativesByPriority, true); + } + + public PrioritizedOperatorSubtaskState( + @Nonnull OperatorSubtaskState jobManagerState, + @Nonnull List<OperatorSubtaskState> alternativesByPriority, + boolean restored) { + + Preconditions.checkNotNull(jobManagerState, "Job manager state is null."); + int size = Preconditions.checkNotNull(alternativesByPriority, "Alternative states are null.").size(); + + this.restored = restored; + + List<StateObjectCollection<OperatorStateHandle>> managedOperatorAlternatives = new ArrayList<>(size); + List<StateObjectCollection<KeyedStateHandle>> managedKeyedAlternatives = new ArrayList<>(size); + List<StateObjectCollection<OperatorStateHandle>> rawOperatorAlternatives = new ArrayList<>(size); + List<StateObjectCollection<KeyedStateHandle>> rawKeyedAlternatives = new ArrayList<>(size); + + for (OperatorSubtaskState subtaskState : alternativesByPriority) { + + if (subtaskState != null) { + managedKeyedAlternatives.add(subtaskState.getManagedKeyedState()); + rawKeyedAlternatives.add(subtaskState.getRawKeyedState()); + managedOperatorAlternatives.add(subtaskState.getManagedOperatorState()); + rawOperatorAlternatives.add(subtaskState.getRawOperatorState()); + } + } + + // Key-groups should match. + BiFunction<KeyedStateHandle, KeyedStateHandle, Boolean> keyedStateApprover = + (ref, alt) -> ref.getKeyGroupRange().equals(alt.getKeyGroupRange()); + --- End diff -- In theory, we could be less strict, yes. But in practice it doesn't matter because rescaling is not supported right now. See my other comment.
---