Roman Khachatryan created FLINK-35769:
-----------------------------------------

             Summary: State files might not be deleted on task cancellation
                 Key: FLINK-35769
                 URL: https://issues.apache.org/jira/browse/FLINK-35769
             Project: Flink
          Issue Type: Bug
          Components: Runtime / State Backends
    Affects Versions: 1.19.1
            Reporter: Roman Khachatryan
            Assignee: Roman Khachatryan
             Fix For: 1.20.0


We have a job in an infinite (fast) restart loop, that’s crashing with a 
serialization issue.
The issue here is that each restart seems to leak state files (not cleaning up 
ones from the previous run):

{{/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep KeyedProcessOperator | wc -l
7990}}
{{/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep StreamingJoinOperator | wc -l
689}}
Eventually TM will use too much disk space.

 

The problem is in 
[https://github.com/apache/flink/blob/64f745a5b1fc14a2cba1ddd977ab8e8db9cf45a4/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L75]
{code:java}
try {
            List<CompletableFuture<Void>> futures =
                    transferAllStateDataToDirectoryAsync(downloadRequests, 
internalCloser)
                            .collect(Collectors.toList());
            // Wait until either all futures completed successfully or one 
failed exceptionally.
            FutureUtils.completeAll(futures).get();
        } catch (Exception e) {
            downloadRequests.stream()
                    .map(StateHandleDownloadSpec::getDownloadDestination)
                    .map(Path::toFile)
                    .forEach(FileUtils::deleteDirectoryQuietly); {code}
Where {{FileUtils::deleteDirectoryQuietly}} will list the files and delete them.
But if {{completeAll}} is interrupted, then download runnable might re-create 
it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to