Some further Googling says on a StackOverflow posting it is the jobmanager that does the deletion and not the taskmanagers.
Currently my taskmanagers are writing their checkpoints to their own private disks (/tmp) rather than a share - so my suspicion is the jobmanager can't access the folder on other machine. I thought the jobmanagers could clear up their own state when instructed to by the jobmanager. I can not yet use an nfs mount in my deployment so I may have to switch to heap checkpoint state instead of using the file storage checkpoint system. Now I understand what's going on a bit better it seems pointless for me to have file checkpoints that can't be read by the jobmanager for failover. If anyone can clarify/correct me I would appreciate. James. ________________________________ From: James Sandys-Lumsdaine Sent: 16 May 2022 18:52 To: user@flink.apache.org <user@flink.apache.org> Subject: Checkpoint directories not cleared as TaskManagers run Hello, I'm seeing my Flink deployment's checkpoint storage directories build up and never clear down. When I run from my own IDE, I see the only the latest "chk-x" directory under the job id folder. So the first checkpoint is "chk-1", which is then replaced with "chk-2" etc. However, when I run as a proper application mode deployment, each of the 4 taskmanagers running in their own containers retain every one of the "chk-x" directories meaning they eat a lot of disk space after as time progresses. Interestingly, the jobmanager itself is fine. Does anyone have any suggestion on how to debug this? Anything obvious that would cause such behaviour? I'm currently using Flink 1.14.0. My set up is essentially below (trimmed for simplicity): Configuration conf = new Configuration(); conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.enableCheckpointing(5 * 1000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State"); Thanks in advance, James.