RE: Checkpoint directories not cleared as TaskManagers run

2022-05-19 Thread Schwalbe Matthias
Hi James,

Let me give some short answers, (there is documentation that better describes 
this):

>> - why do taskmanagers create the chk-x directory but only the jobmanager can 
>> delete it? Shouldn’t the jobmanager be the only component creating and 
>> deleting these directories? That would seem more consistent to me but maybe 
>> there is a reason.

  *   Assuming proper setup, i.e. checkpoint directory is on a shared folder
  *   Tasks and state thereof are split as subtasks to separate slots 
(according to parallelism)
  *   When checkpoints are written each state primitive on each resp. subtask 
writes its portion of state to the checkpoint folder and forwards the filename 
to the job manager
  *   For incremental checkpoints some files also remain in older checkpoint 
folders until obsolete
  *   This process is managed by jobmanager
  *   In the end of each checkpoint, jobmanager writes _metadata file to the 
resp. checkpoint folder containing (simplified) the filenames of respective 
states and small state
  *   When a new checkpoint is finished, jobmanager decides according to 
configuration which old checkpoint files become obsolete and hence deleted
  *   In general checkpoints and savepoints are for high availability purposes, 
if the checkpoint data were on a local folder of machine that crashed it would 
not be available for restart of the job
  *   The parts that should be on a local (and fast) drive are the ones used by 
RocksDB, these are ephermeral and can (and will) be recreated on job recovery
>>  - I see many files under each chk-x folder. Can anyone confirm if each file 
>> is wholly owned by a single task manager? ie is each file only written by 1 
>> TM? Otherwise there could be file locking and contention.

  *   Mostly explained above … however
  *   If two taskmanagers happen to be started on the same machine (uncommon 
for k8s, common for Yarn resource manager) they would use the same folder
  *   Filenames contain a uuid which is unlikely to collide
>> - we are now looking to add in NFS mounts for our containers so all the job 
>> managers and taskmanagers share the same path. Can anyone confirm if NFS is 
>> a ‘reliable’ storage mechanism as we have heard many stories how problematic 
>> it can be. We are not yet able to use HDFS or S3.

  *   NFS is not reliable, probably not fit for PROD purposes, don’t know about 
some NAS setup that uses NFS and has integrated reliability …
>> - if Flink can not write to NFS my understanding is although the checkpoint 
>> will fail the Flink process will carry on and try again at the next 
>> checkpoint. It will not cause my program to fail correct?

  *   Imho there would be no reason to setup checkpointing in the first place, 
if you cannot restart a job from such checkpoint
  *   This is only important, of course, if you need reliability, or exactly 
once semantics …

Thias

From: James Sandys-Lumsdaine 
Sent: Wednesday, May 18, 2022 2:53 PM
To: Schwalbe Matthias 
Cc: Hangxiang Yu ; user@flink.apache.org
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hello Matthias,

Thanks for your reply. Yes indeed your are correct. My /tmp path is private so 
you have confirmed what I thought was happening.

I have some follow up questions:
- why do taskmanagers create the chk-x directory but only the jobmanager can 
delete it? Shouldn’t the jobmanager be the only component creating and deleting 
these directories? That would seem more consistent to me but maybe there is a 
reason.
- I see many files under each chk-x folder. Can anyone confirm if each file is 
wholly owned by a single task manager? ie is each file only written by 1 TM? 
Otherwise there could be file locking and contention.
- we are now looking to add in NFS mounts for our containers so all the job 
managers and taskmanagers share the same path. Can anyone confirm if NFS is a 
‘reliable’ storage mechanism as we have heard many stories how problematic it 
can be. We are not yet able to use HDFS or S3.
- if Flink can not write to NFS my understanding is although the checkpoint 
will fail the Flink process will carry on and try again at the next checkpoint. 
It will not cause my program to fail correct?

Many thanks again,

James.

Sent from my iPhone


On 17 May 2022, at 15:17, Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> wrote:

Hi James,

From reading the thread … I assume, your file:/tmp/Flink/State folder is not 
shared across all machines, right?

In this case it cannot work:
- checkpoints and savepoints need to go to a path that can be commonly accessed 
by jobmanager and all taskmanagers in order to work
- as your jobmanager can not access the checkpoint files of it can also not 
clean-up those files

Hope that helps

Regards

Thias

From: James Sandys-Lumsdaine mailto:jas...@hotmail.com>>
Sent: Tuesday, May 17, 2022 3:55 PM
To: Hangxiang Y

Re: Checkpoint directories not cleared as TaskManagers run

2022-05-18 Thread James Sandys-Lumsdaine
Hello Matthias,

Thanks for your reply. Yes indeed your are correct. My /tmp path is private so 
you have confirmed what I thought was happening.

I have some follow up questions:
- why do taskmanagers create the chk-x directory but only the jobmanager can 
delete it? Shouldn’t the jobmanager be the only component creating and deleting 
these directories? That would seem more consistent to me but maybe there is a 
reason.
- I see many files under each chk-x folder. Can anyone confirm if each file is 
wholly owned by a single task manager? ie is each file only written by 1 TM? 
Otherwise there could be file locking and contention.
- we are now looking to add in NFS mounts for our containers so all the job 
managers and taskmanagers share the same path. Can anyone confirm if NFS is a 
‘reliable’ storage mechanism as we have heard many stories how problematic it 
can be. We are not yet able to use HDFS or S3.
- if Flink can not write to NFS my understanding is although the checkpoint 
will fail the Flink process will carry on and try again at the next checkpoint. 
It will not cause my program to fail correct?

Many thanks again,

James.

Sent from my iPhone

On 17 May 2022, at 15:17, Schwalbe Matthias  wrote:


Hi James,

From reading the thread … I assume, your file:/tmp/Flink/State folder is not 
shared across all machines, right?

In this case it cannot work:
- checkpoints and savepoints need to go to a path that can be commonly accessed 
by jobmanager and all taskmanagers in order to work
- as your jobmanager can not access the checkpoint files of it can also not 
clean-up those files

Hope that helps

Regards

Thias

From: James Sandys-Lumsdaine 
Sent: Tuesday, May 17, 2022 3:55 PM
To: Hangxiang Yu ; user@flink.apache.org
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Thanks for your replay.

To be clear on my setup with the problem:

  *   4 taskmanagers running across different containers and machines. Each 
container has its own filesystem including / and /tmp.
  *   1 jobmanager also running in its own container and machine. Also has its 
own filesystem.
  *   I have configured the FS checkpoint address to be "file:/tmp/Flink/State" 
- therefore each process (JM and TMs) are reading and writing to their own 
/tmp. i.e. there is no shared access like if it was NFS or HDFS.
So when the checkpointing happens the directories are created and populated but 
only the JM's old checkpoint directories and cleaned up. Each of the TM 
/tmp/Flink/State old "chk-x" directories remain and are not cleared up.

From your email I don't know if you think I am writing to a "shared" path or 
not?

I started looking at the in memory checkpoint storage but this has a max size 
with an int so can't have for 5GB of state. I need the checkpointing to trigger 
my sinks to persist (GenericWriteAheadSink) so it seem I have​ to create a 
proper shared file path all my containers can access.

James.

From: Hangxiang Yu mailto:master...@gmail.com>>
Sent: 17 May 2022 14:38
To: James Sandys-Lumsdaine mailto:jas...@hotmail.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then upload 
them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the entire 
deletion for checkpoints.
Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> wrote:
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.

From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,



I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.



When I run from my own IDE, I see the only the latest &qu

RE: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread Schwalbe Matthias
Hi James,

From reading the thread … I assume, your file:/tmp/Flink/State folder is not 
shared across all machines, right?

In this case it cannot work:
- checkpoints and savepoints need to go to a path that can be commonly accessed 
by jobmanager and all taskmanagers in order to work
- as your jobmanager can not access the checkpoint files of it can also not 
clean-up those files

Hope that helps

Regards

Thias

From: James Sandys-Lumsdaine 
Sent: Tuesday, May 17, 2022 3:55 PM
To: Hangxiang Yu ; user@flink.apache.org
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Thanks for your replay.

To be clear on my setup with the problem:

  *   4 taskmanagers running across different containers and machines. Each 
container has its own filesystem including / and /tmp.
  *   1 jobmanager also running in its own container and machine. Also has its 
own filesystem.
  *   I have configured the FS checkpoint address to be "file:/tmp/Flink/State" 
- therefore each process (JM and TMs) are reading and writing to their own 
/tmp. i.e. there is no shared access like if it was NFS or HDFS.
So when the checkpointing happens the directories are created and populated but 
only the JM's old checkpoint directories and cleaned up. Each of the TM 
/tmp/Flink/State old "chk-x" directories remain and are not cleared up.

From your email I don't know if you think I am writing to a "shared" path or 
not?

I started looking at the in memory checkpoint storage but this has a max size 
with an int so can't have for 5GB of state. I need the checkpointing to trigger 
my sinks to persist (GenericWriteAheadSink) so it seem I have​ to create a 
proper shared file path all my containers can access.

James.

From: Hangxiang Yu mailto:master...@gmail.com>>
Sent: 17 May 2022 14:38
To: James Sandys-Lumsdaine mailto:jas...@hotmail.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then upload 
them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the entire 
deletion for checkpoints.
Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> wrote:
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.

From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,



I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.



When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.



However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.



Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.



My set up is essentially below (trimmed for simplicity):

   Configuration conf = new Configuration();

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);


conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



env.enableCheckpointing(5 * 1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage("f

Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread James Sandys-Lumsdaine
Thanks for your replay.

To be clear on my setup with the problem:

  *   4 taskmanagers running across different containers and machines. Each 
container has its own filesystem including / and /tmp.
  *   1 jobmanager also running in its own container and machine. Also has its 
own filesystem.
  *   I have configured the FS checkpoint address to be "file:/tmp/Flink/State" 
- therefore each process (JM and TMs) are reading and writing to their own 
/tmp. i.e. there is no shared access like if it was NFS or HDFS.

So when the checkpointing happens the directories are created and populated but 
only the JM's old checkpoint directories and cleaned up. Each of the TM 
/tmp/Flink/State old "chk-x" directories remain and are not cleared up.

From your email I don't know if you think I am writing to a "shared" path or 
not?

I started looking at the in memory checkpoint storage but this has a max size 
with an int so can't have for 5GB of state. I need the checkpointing to trigger 
my sinks to persist (GenericWriteAheadSink) so it seem I have​ to create a 
proper shared file path all my containers can access.

James.

From: Hangxiang Yu 
Sent: 17 May 2022 14:38
To: James Sandys-Lumsdaine ; user@flink.apache.org 

Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then upload 
them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the entire 
deletion for checkpoints.

Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> wrote:
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.

From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,


I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.


When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.


However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.


Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.


My set up is essentially below (trimmed for simplicity):

   Configuration conf = new Configuration();

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);


conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



env.enableCheckpointing(5 * 1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");


Thanks in advance,

James.



Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread Hangxiang Yu
Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then
upload them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the
entire deletion for checkpoints.

Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
wrote:

> Some further Googling says on a StackOverflow posting it is the jobmanager
> that does the deletion and not the taskmanagers.
>
> Currently my taskmanagers are writing their checkpoints to their own
> private disks (/tmp) rather than a share - so my suspicion is the
> jobmanager can't access the folder on other machine. I thought the
> jobmanagers could clear up their own state when instructed to by the
> jobmanager.
>
> I can not yet use an nfs mount in my deployment so I may have to switch to
> heap checkpoint state instead of using the file storage checkpoint system.
> Now I understand what's going on a bit better it seems pointless for me to
> have file checkpoints that can't be read by the jobmanager for failover.
>
> If anyone can clarify/correct me I would appreciate.
>
> James.
> --
> *From:* James Sandys-Lumsdaine
> *Sent:* 16 May 2022 18:52
> *To:* user@flink.apache.org 
> *Subject:* Checkpoint directories not cleared as TaskManagers run
>
>
> Hello,
>
>
> I'm seeing my Flink deployment's checkpoint storage directories build up
> and never clear down.
>
>
> When I run from my own IDE, I see the only the *latest *"chk-x" directory
> under the job id folder. So the first checkpoint is "chk-1", which is then
> replaced with "chk-2" etc.
>
>
> However, when I run as a proper application mode deployment, each of the 4
> taskmanagers running in their own containers retain every one of the
> "chk-x" directories meaning they eat a lot of disk space after as time
> progresses. Interestingly, the jobmanager itself is fine.
>
>
> Does anyone have any suggestion on how to debug this? Anything obvious
> that would cause such behaviour? I'm currently using Flink 1.14.0.
>
>
> My set up is essentially below (trimmed for simplicity):
>
>Configuration conf = new Configuration();
>
> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
>
>
> conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
> true);
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>
>
>
> env.enableCheckpointing(5 * 1000);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);
>
>
>
> env.setStateBackend(new HashMapStateBackend());
>
>
> env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");
>
>
> Thanks in advance,
>
> James.
>
>


Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread James Sandys-Lumsdaine
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.

From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org 
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,


I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.


When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.


However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.


Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.


My set up is essentially below (trimmed for simplicity):

   Configuration conf = new Configuration();

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);


conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



env.enableCheckpointing(5 * 1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");


Thanks in advance,

James.