[ 
https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569936#comment-17569936
 ] 

Jinzhong Li edited comment on FLINK-28515 at 7/22/22 10:38 AM:
---------------------------------------------------------------

Thanks for reply [~roman] 

I'd like to explain the reason for this issue in more detail. Please correct me 
if anything is wrong.

>> So if it's already running, cleanup() can be skipped by the current thread.
closeSnapshotIO() only closes the registry, and I don't see that the folder is 
registered with it. 

This is right, for this case, closeSnapshotIO() will close all the 
CheckpointStreams which belong to the folder, but it will not delete the folder.

 

1. But if cleanup() method is skipped here, it will still be invoke in 
[AsyncSnapshotCallable.call() 
finally|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]
 . 

2. AsyncSnapshotCallable.cleanup() can only delete the ONGOING folder, not 
COMPLETED.(AsyncSnapshotTask.cleanup() -> 
IncrementalRocksDBSnapshotResources.release -> SnapshotDirectory.cleanUp)
{code:java}
 public boolean cleanup() throws IOException {
      if (state.compareAndSet(State.ONGOING, State.DELETED)) {
          FileUtils.deleteDirectory(directory.toFile());
      }
      return true;
 } {code}
3. AsyncSnapshotTask.callInternal() will invoke 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry 
snapshotCloseableRegistry), in which the folder status will be [[transformed 
from ONGOING to 
COMPLETED|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L422]|#L422].]

If RocksDBIncrementalSnapshotOperation.get(CloseableRegistry) encounters *no* 
exception after rocksdb AsyncSnapshotTask is cancelled,

(1) the localSnapshot folder can't be cleaned-up by 
AsyncSnapshotCallable.cleanup() because the folder status is COMPLETED;

(2) the localSnapshot folder can't be cleaned-up by 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry)-finally because the 
completed flag is ture.
{code:java}
@Override
public SnapshotResult<KeyedStateHandle> get(CloseableRegistry 
snapshotCloseableRegistry)  
 throws Exception {
      try{
          .......
          completed = true;
          return snapshotResult;
       } finally {
           if (!completed) {
              .........
            }
       }
} {code}
 


was (Author: lijinzhong):
Thanks for reply [~roman] 

I'd like to explain the reason for this issue in more detail. Please correct me 
if anything is wrong.

>> So if it's already running, cleanup() can be skipped by the current thread.
closeSnapshotIO() only closes the registry, and I don't see that the folder is 
registered with it. 

This is right, for this case, closeSnapshotIO() will close all the 
CheckpointStreams which belong to the folder, but it will not delete the folder.

 

1. But if cleanup() method is skipped here, it will still be invoke in 
[AsyncSnapshotCallable.call() 
finally|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]
 . 

2. AsyncSnapshotCallable.cleanup() can only delete the ONGOING folder, not 
COMPLETED.(AsyncSnapshotTask.cleanup() -> 
IncrementalRocksDBSnapshotResources.release -> SnapshotDirectory.cleanUp)
{code:java}
 public boolean cleanup() throws IOException {
      if (state.compareAndSet(State.ONGOING, State.DELETED)) {
          FileUtils.deleteDirectory(directory.toFile());
      }
      return true;
 } {code}
3. AsyncSnapshotTask.callInternal() will invoke 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry 
snapshotCloseableRegistry), in which the folder status will be [transformed 
from ONGOING to COMPLETED|#L422].]

If RocksDBIncrementalSnapshotOperation.get(CloseableRegistry) encounters *no* 
exception after rocksdb AsyncSnapshotTask is cancelled,

(1) the localSnapshot folder can't be cleaned-up by 
AsyncSnapshotCallable.cleanup() because the folder status is COMPLETED;

(2) the localSnapshot folder can't be cleaned-up by 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry)-finally because the 
completed flag is ture.
{code:java}
@Override
public SnapshotResult<KeyedStateHandle> get(CloseableRegistry 
snapshotCloseableRegistry)  
 throws Exception {
      try{
          .......
          completed = true;
          return snapshotResult;
       } finally {
           if (!completed) {
              .........
            }
       }
} {code}
 

> The files in local recovery directory hasn't be clean up properly after 
> checkpoint abort
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-28515
>                 URL: https://issues.apache.org/jira/browse/FLINK-28515
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.15.1, 1.16.0
>            Reporter: Jinzhong Li
>            Assignee: Jinzhong Li
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: C7245668-CE31-4F56-B9CB-12E2F1E900C5.png, 
> image-2022-07-19-18-28-20-239.png, image.png
>
>
> In my case,  i found that some files in local recovery directory hasn't be 
> clean up properly after checkpoint abort(as shown in the attached picture).
> By analyzing flink log, I found that when stateBackend completes the local 
> snapshot but the task has not completed the whole snapshot, 
> then checkpoint is aborted (caused by checkpoint timeout or netword-error),  
> files in the local directory directory may not be cleaned up properly.
> I think the reason for local snapshot file residual is:
> (1) In the org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable, 
> the comleted localSnapshot info can be registered into 
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl only after task  has 
> completed the whole snapshot. 
> ([AsyncCheckpointRunnable.java#L136|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L136]).
> (2) If stateBackend completes the local snapshot but the task has not 
> completed the entire snapshot, when checkpoint-aborting is triggered, the 
> TaskLocalStateStore can't clean up the unregistered localSnapshot files. 
> ([TaskLocalStateStoreImpl.java#L301|https://github.com/apache/flink/blob/3ec376601f836df6314e771b243ca6f896a7f642/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L301])
> (3) And when SubtaskCheckpointCoordinatorImpl receive the abort notification, 
> it will cancel all the ongoing stateSnapshot futureTask in 
> 'AsyncCheckpointRunnable.close()'.  For rocksdbKeyedStatebackend, 
> [AsyncSnapshotTask.cancel 
> |https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L129]will
>  be invoke during checkpoint abort. After this, the 
> [RocksDBIncrementalSnapshotOperation.get 
> |https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L384]may
>  still run until it completes.
> And the localSnapshot files can't be cleaned up in 
> RocksDBIncrementalSnapshotOperation.get(finally) and 
> AsyncSnapshotCallable.call([finally-cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]).
> Then the localSnapshot files  also can't be cleaned up in 
> [AsyncCheckpointRunnable.cleanup|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java#L391],
>  because 
> [AsyncSnapshotTask.cancel|https://github.com/apache/flink/blob/dcc9cceab962c897c7a5e55deb868eb2d86468ec/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java#L78]
>  return ture.
>  
> To fix this problem, I think when TaskLocalStateStoreImpl abort Checkpoint, 
> we can try to delete the corresponding localRecovery directory, even if the 
> checkpoint is not unregistered into TaskLocalStateStoreImpl.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to