Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.
________________________________
From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org <user@flink.apache.org>
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,


I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.


When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.


However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.


Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.


My set up is essentially below (trimmed for simplicity):

       Configuration conf = new Configuration();

        conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

        
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



        env.enableCheckpointing(5 * 1000);

        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



        env.setStateBackend(new HashMapStateBackend());

        env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");


Thanks in advance,

James.

Reply via email to