[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367402#comment-16367402
 ] 

ASF GitHub Bot commented on FLINK-8360:
---------------------------------------

Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168774627
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
 ---
    @@ -0,0 +1,265 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import org.apache.flink.runtime.state.OperatorStateHandle;
    +import org.apache.flink.runtime.state.KeyedStateHandle;
    +import org.apache.flink.runtime.state.StateObject;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.function.BiFunction;
    +
    +/**
    + * This class is a wrapper over multiple alternative {@link 
OperatorSubtaskState} that are (partial) substitutes for
    + * each other and imposes a priority ordering over all alternatives for 
the different states which define an order in
    + * which the operator should attempt to restore the state from them. One 
OperatorSubtaskState is considered as the
    + * "ground truth" about which state should be represented. Alternatives 
may be complete or partial substitutes for
    + * the "ground truth" with a higher priority (if they had a lower 
alternative, they would not really be alternatives).
    + * Substitution is determined on a per-sub-state basis.
    + */
    +public class PrioritizedOperatorSubtaskState {
    +
    +   /** Singleton instance for an empty, non-restored operator state. */
    +   private static final PrioritizedOperatorSubtaskState 
EMPTY_NON_RESTORED_INSTANCE =
    +           new PrioritizedOperatorSubtaskState(new OperatorSubtaskState(), 
Collections.emptyList(), false);
    +
    +   /** List of prioritized snapshot alternatives for managed operator 
state. */
    +   private final List<StateObjectCollection<OperatorStateHandle>> 
prioritizedManagedOperatorState;
    +
    +   /** List of prioritized snapshot alternatives for raw operator state. */
    +   private final List<StateObjectCollection<OperatorStateHandle>> 
prioritizedRawOperatorState;
    +
    +   /** List of prioritized snapshot alternatives for managed keyed state. 
*/
    +   private final List<StateObjectCollection<KeyedStateHandle>> 
prioritizedManagedKeyedState;
    +
    +   /** List of prioritized snapshot alternatives for raw keyed state. */
    +   private final List<StateObjectCollection<KeyedStateHandle>> 
prioritizedRawKeyedState;
    +
    +   /** Signal flag if this represents state for a restored operator. */
    +   private final boolean restored;
    +
    +   public PrioritizedOperatorSubtaskState(
    +           @Nonnull OperatorSubtaskState jobManagerState,
    +           @Nonnull List<OperatorSubtaskState> alternativesByPriority) {
    +           this(jobManagerState, alternativesByPriority, true);
    +   }
    +
    +   public PrioritizedOperatorSubtaskState(
    +           @Nonnull OperatorSubtaskState jobManagerState,
    +           @Nonnull List<OperatorSubtaskState> alternativesByPriority,
    +           boolean restored) {
    +
    +           Preconditions.checkNotNull(jobManagerState, "Job manager state 
is null.");
    +           int size = Preconditions.checkNotNull(alternativesByPriority, 
"Alternative states are null.").size();
    +
    +           this.restored = restored;
    +
    +           List<StateObjectCollection<OperatorStateHandle>> 
managedOperatorAlternatives = new ArrayList<>(size);
    +           List<StateObjectCollection<KeyedStateHandle>> 
managedKeyedAlternatives = new ArrayList<>(size);
    +           List<StateObjectCollection<OperatorStateHandle>> 
rawOperatorAlternatives = new ArrayList<>(size);
    +           List<StateObjectCollection<KeyedStateHandle>> 
rawKeyedAlternatives = new ArrayList<>(size);
    +
    +           for (OperatorSubtaskState subtaskState : 
alternativesByPriority) {
    +
    +                   if (subtaskState != null) {
    +                           
managedKeyedAlternatives.add(subtaskState.getManagedKeyedState());
    +                           
rawKeyedAlternatives.add(subtaskState.getRawKeyedState());
    +                           
managedOperatorAlternatives.add(subtaskState.getManagedOperatorState());
    +                           
rawOperatorAlternatives.add(subtaskState.getRawOperatorState());
    +                   }
    +           }
    +
    +           // Key-groups should match.
    +           BiFunction<KeyedStateHandle, KeyedStateHandle, Boolean> 
keyedStateApprover =
    +                   (ref, alt) -> 
ref.getKeyGroupRange().equals(alt.getKeyGroupRange());
    +
    --- End diff --
    
    Why `ref.getKeyGroupRange()` and `alt. ref.getKeyGroupRange ()` need to be 
strictly equal? It seems that `alt. ref.getKeyGroupRange ().begin <= ref. 
ref.getKeyGroupRange ().begin && alt. ref.getKeyGroupRange ().end >= ref 
ref.getKeyGroupRange().end` is also acceptable.


> Implement task-local state recovery
> -----------------------------------
>
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to