[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348362#comment-16348362
 ] 

ASF GitHub Bot commented on FLINK-8360:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r165315625
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
    @@ -859,56 +861,77 @@ public void run() {
                                if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
                                        
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
     
    -                                   TaskStateSnapshot acknowledgedState = 
hasState ? taskOperatorSubtaskStates : null;
    -
    -                                   TaskStateManager taskStateManager = 
owner.getEnvironment().getTaskStateManager();
    -
    -                                   // we signal stateless tasks by 
reporting null, so that there are no attempts to assign empty state
    -                                   // to stateless tasks on restore. This 
enables simple job modifications that only concern
    -                                   // stateless without the need to assign 
them uids to match their (always empty) states.
    -                                   
taskStateManager.reportTaskStateSnapshot(
    -                                           checkpointMetaData,
    -                                           checkpointMetrics,
    -                                           acknowledgedState);
    -
    -                                   LOG.debug("{} - finished asynchronous 
part of checkpoint {}. Asynchronous duration: {} ms",
    -                                           owner.getName(), 
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
    -
    -                                   LOG.trace("{} - reported the following 
states in snapshot for checkpoint {}: {}.",
    -                                           owner.getName(), 
checkpointMetaData.getCheckpointId(), acknowledgedState);
    +                                   reportCompletedSnapshotStates(
    +                                           
jobManagerTaskOperatorSubtaskStates,
    +                                           localTaskOperatorSubtaskStates,
    +                                           asyncDurationMillis);
     
                                } else {
                                        LOG.debug("{} - asynchronous part of 
checkpoint {} could not be completed because it was closed before.",
                                                owner.getName(),
                                                
checkpointMetaData.getCheckpointId());
                                }
                        } catch (Exception e) {
    -                           // the state is completed if an exception 
occurred in the acknowledgeCheckpoint call
    -                           // in order to clean up, we have to set it to 
RUNNING again.
    -                           asyncCheckpointState.compareAndSet(
    -                                   
CheckpointingOperation.AsynCheckpointState.COMPLETED,
    -                                   
CheckpointingOperation.AsynCheckpointState.RUNNING);
    -
    -                           try {
    -                                   cleanup();
    -                           } catch (Exception cleanupException) {
    -                                   e.addSuppressed(cleanupException);
    -                           }
    -
    -                           Exception checkpointException = new Exception(
    -                                   "Could not materialize checkpoint " + 
checkpointId + " for operator " +
    -                                           owner.getName() + '.',
    -                                   e);
    -
    -                           
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
    -                                   checkpointMetaData,
    -                                   checkpointException);
    +                           handleExecutionException(e);
                        } finally {
                                owner.cancelables.unregisterCloseable(this);
                                
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                        }
                }
     
    +           private void reportCompletedSnapshotStates(
    +                   TaskStateSnapshot acknowledgedTaskStateSnapshot,
    +                   TaskStateSnapshot localTaskStateSnapshot,
    +                   long asyncDurationMillis) {
    +
    +                   TaskStateManager taskStateManager = 
owner.getEnvironment().getTaskStateManager();
    +
    +                   boolean hasAckState = 
acknowledgedTaskStateSnapshot.hasState();
    +                   boolean hasLocalState = 
localTaskStateSnapshot.hasState();
    +
    +                   Preconditions.checkState(hasAckState || !hasLocalState,
    +                           "Found cached state but no corresponding 
primary state is reported to the job " +
    +                                   "manager. This indicates a problem.");
    +
    +                   // we signal stateless tasks by reporting null, so that 
there are no attempts to assign empty state
    +                   // to stateless tasks on restore. This enables simple 
job modifications that only concern
    +                   // stateless without the need to assign them uids to 
match their (always empty) states.
    +                   taskStateManager.reportTaskStateSnapshots(
    +                           checkpointMetaData,
    +                           checkpointMetrics,
    +                           hasAckState ? acknowledgedTaskStateSnapshot : 
null,
    +                           hasLocalState ? localTaskStateSnapshot : null);
    +
    +                   LOG.debug("{} - finished asynchronous part of 
checkpoint {}. Asynchronous duration: {} ms",
    +                           owner.getName(), 
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
    +
    +                   LOG.trace("{} - reported the following states in 
snapshot for checkpoint {}: {}.",
    +                           owner.getName(), 
checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
    +           }
    +
    +           private void handleExecutionException(Exception e) {
    +                   // the state is completed if an exception occurred in 
the acknowledgeCheckpoint call
    --- End diff --
    
    comment needs to be adapted. 


> Implement task-local state recovery
> -----------------------------------
>
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to