[ 
https://issues.apache.org/jira/browse/FLINK-5229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834202#comment-15834202
 ] 

ASF GitHub Bot commented on FLINK-5229:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3179#discussion_r97285679
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
    @@ -954,6 +954,27 @@ public void run() {
                                                        owner.getName(), 
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
                                }
                        } catch (Exception e) {
    +                           // clean up ongoing operator snapshot results 
and non partitioned state handles
    +                           for (OperatorSnapshotResult 
operatorSnapshotResult : snapshotInProgressList) {
    +                                   if (operatorSnapshotResult != null) {
    +                                           try {
    +                                                   
operatorSnapshotResult.cancel();
    +                                           } catch (Exception 
cancelException) {
    +                                                   
e.addSuppressed(cancelException);
    +                                           }
    +                                   }
    +                           }
    +
    +                           for (StreamStateHandle 
nonPartitionedStateHandle : nonPartitionedStateHandles) {
    --- End diff --
    
    You could use the utility method 
`StateUtil.bestEffortDiscardAllStateObjects(...)` to reduce code duplication a 
bit.
    
    I think we should also discard other state objects that completed fuctures 
could have created, e.g. 
`StateUtil.bestEffortDiscardAllStateObjects(operatorStatesBackend);` etc. , 
what do you think?



> Cleanup StreamTaskStates if a checkpoint operation of a subsequent operator 
> fails 
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-5229
>                 URL: https://issues.apache.org/jira/browse/FLINK-5229
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing, TaskManager
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 1.2.0, 1.1.4
>
>
> Due to chaining, a {{StreamTask}} needs to checkpoint multiple operators. If 
> the first operators succeed in creating a checkpoint but a subsequent 
> operator in the chain fails, the {{StreamTask}} has to clean up the already 
> completed checkpoints. Otherwise we might end up with orphaned state data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to