Hello Flink! We are building an infrastructure where we implement our own CompletedCheckpointStore. The read and write to the external storage location of these checkpoints are through HTTP calls to an external service.
Recently we noticed some checkpoint file cleanup performance issue when the job writes out a very high number of checkpoint files per checkpoint. (In our case we had a few hundreds of operators and ran with 16 parallelism) During checkpoint state discard phase, since the implementation in CompletedCheckpoint discards the state files one by one, we are seeing a very high number of remote calls. Sometimes the deletion fails to catch up with the checkpoint progress. Given the interface we are given to configure the external storage location for checkpoints is always a `target directory`. Would it be reasonable to expose an implementation of discard() that directly calls disposeStorageLocation with recursive set to true, without iterating over each individual files first? Is there any blockers for that? Thank you! links https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L240 https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java#L70