Hi Yun,

First of all sorry for the naming mistake , it was a typo 
 How to judge that they are related to specific checkpoint?
  I judged by removing the files and restarting the job - seeing if it fails

in the code below I missed the privateState that is why I was missing files

What about recovery folders how can I know when and what files can I removed? 
I see the folder contains recovery -<job-name>  and list of 
completedCheckpoint3bc1020bb0f9 

Best,
Shachar

On 2020/04/20 12:14:35, Yun Tang <myas...@live.com> wrote: 
> Hi Shachar
> 
> You can refer to [1] to know the directory structure. The files (usually 
> ByteStreamStateHandle) which are not in the shared folder are exclusive state 
> like operator state or exclusive files uploaded during each incremental 
> checkpoint. And actually I don't understand why you would say some files are 
> not mentioned in the metadata file but are related to the checkpoint? How to 
> judge that they are related to specific checkpoint?
> 
> BTW, my name is "Yun" which means cloud in Chinese, not the delicious "Yum" 🙂
> 
> Best
> Yun Tang
> ________________________________
> From: Shachar Carmeli <carmeli....@gmail.com>
> Sent: Monday, April 20, 2020 15:36
> To: user@flink.apache.org <user@flink.apache.org>
> Subject: Re: Flink incremental checkpointing - how long does data is kept in 
> the share folder
> 
> Hi Yum
> I noticed that the some files are related to the checkpoint but are not 
> mentioned in the metadata file
> and some of the files that are related in the metadata file (usually 
> ByteStreamStateHandle ) are not in the share file
> can you explain this behaviour ?
> 
> see code I was using
> final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, 
> CheckpointTool.class.getClassLoader());
> 
>                         final Set<String> pathSharedFromMetadata = 
> savepoint.getOperatorStates().stream()
>                                         .flatMap(operatorState -> 
> operatorState.getSubtaskStates().values().stream()
>                                                         
> .flatMap(operatorSubtaskState -> 
> operatorSubtaskState.getManagedKeyedState().stream()
>                                                                         
> .flatMap(keyedStateHandle -> ((IncrementalKeyedStateHandle) 
> keyedStateHandle).getSharedState().values().stream()
>                                                                               
>           .map(streamStateHandle -> {
>                                                                               
>                   totalSize[0] += streamStateHandle.getStateSize();
>                                                                               
>                   String name = null;
>                                                                               
>                   if (streamStateHandle instanceof FileStateHandle) {
>                                                                               
>                           name = ((FileStateHandle) 
> streamStateHandle).getFilePath().getName();
>                                                                               
>                   } else {
>                                                                               
>                           final String handleName = ((ByteStreamStateHandle) 
> streamStateHandle).getHandleName();
>                                                                               
>                           name = new File(handleName).getName();
>                                                                               
>                   }
>                                                                               
>                   return name;
> 
>                                                                               
>           }))))
>                                         .collect(Collectors.toSet());
> 
> Thanks in advance
> Shachar
> 
> On 2020/04/13 14:30:40, Yun Tang <myas...@live.com> wrote:
> > Hi Shachar
> >
> > I think you could refer to [1] to know the directory structure of 
> > checkpoints. The '_metadata' file contains all information of which  
> > checkpointed data file belongs, e.g. file paths under 'shared' folder. As I 
> > said before, you need to call Checkpoints#loadCheckpointMetadata to load 
> > '_metadata' to know which files belonging to that checkpoint.
> >
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
> >
> > Best
> > Yun Tang
> >
> > ________________________________
> > From: Shachar Carmeli <carmeli....@gmail.com>
> > Sent: Sunday, April 12, 2020 15:32
> > To: user@flink.apache.org <user@flink.apache.org>
> > Subject: Re: Flink incremental checkpointing - how long does data is kept 
> > in the share folder
> >
> > Thank you for the quick response
> > Your answer related to the checkpoint folder that contains the _metadata 
> > file e.g. chk-1829
> > What about the "shared" folder , how do I know which  files in that folder 
> > are still relevant and which are left over from a failed checkpoint , they 
> > are not directly related to the _metadata checkpoint or am I missing 
> > something?
> >
> >
> > On 2020/04/07 18:37:57, Yun Tang <myas...@live.com> wrote:
> > > Hi Shachar
> > >
> > > Why do we see data that is older from lateness configuration
> > > There might existed three reasons:
> > >
> > >   1.  RocksDB really still need that file in current checkpoint. If we 
> > > upload one file named as 42.sst at 2/4 at some old checkpoint, current 
> > > checkpoint could still include that 42.sst file again if that file is 
> > > never be compacted since then. This is possible in theory.
> > >   2.  Your checkpoint size is large and checkpoint coordinator could not 
> > > remove as fast as possible before exit.
> > >   3.  That file is created by a crash task manager and not known to 
> > > checkpoint coordinator.
> > >
> > > How do I know that the files belong to a valid checkpoint and not a 
> > > checkpoint of a crushed job - so we can delete those files
> > > You have to call Checkpoints#loadCheckpointMetadata[1] to load latest 
> > > _metadata in checkpoint directory and compare the file paths with current 
> > > files in checkpoint directory. The ones are not in the checkpoint meta 
> > > and older than latest checkpoint could be removed. You could follow this 
> > > to debug or maybe I could write a tool to help know what files could be 
> > > deleted later.
> > >
> > > [1] 
> > > https://github.com/apache/flink/blob/693cb6adc42d75d1db720b45013430a4c6817d4a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L96
> > >
> > > Best
> > > Yun Tang
> > >
> > > ________________________________
> > > From: Shachar Carmeli <carmeli....@gmail.com>
> > > Sent: Tuesday, April 7, 2020 16:19
> > > To: user@flink.apache.org <user@flink.apache.org>
> > > Subject: Flink incremental checkpointing - how long does data is kept in 
> > > the share folder
> > >
> > > We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining 
> > > only one checkpoint at a time , using incremental and using rocksdb.
> > >
> > > We run windows with lateness of 3 days , which means that we expect that 
> > > no data in the checkpoint share folder will be kept after 3-4 days ,Still 
> > > We see that there is data from more than that
> > > e.g.
> > > If today is 7/4 there are some files from the 2/4
> > >
> > > Sometime we see checkpoints that we assume (due to the fact that its 
> > > index number is not in synch) that it belongs to a job that crushed and 
> > > the checkpoint was not used to restore the job
> > >
> > > My questions are
> > >
> > > Why do we see data that is older from lateness configuration
> > > How do I know that the files belong to a valid checkpoint and not a 
> > > checkpoint of a crushed job - so we can delete those files
> > >
> >
> 

Reply via email to