Re: Externalized checkpoints
Got it.Thank you On Thu, Aug 22, 2019 at 8:54 PM Congxian Qiu wrote: > Hi, Vishwas > > As Zhu Zhu said, you can set "state.checkpoints.num-retained"[1] to > specify the maximum number of completed checkpoints to retain. > maybe you can also ref the external checkpoint cleanup type[2] config for > how to clean up the retained checkpoint[2] > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/checkpointing.html#related-config-options > [2] > https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html#retained-checkpoints > Best, > Congxian > > > Zhu Zhu 于2019年8月22日周四 上午10:13写道: > >> Hi Vishwas, >> >> You can configure "state.checkpoints.num-retained" to specify the max >> checkpoints to retain. >> By default it is 1. >> >> Thanks, >> Zhu Zhu >> >> Vishwas Siravara 于2019年8月22日周四 上午6:48写道: >> >>> I am also using exactly once checkpointing mode, I have a kafka source >>> and sink so both support transactions which should allow for exactly once >>> processing. Is this the reason why there is only one checkpoint retained ? >>> >>> Thanks, >>> Vishwas >>> >>> On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara >>> wrote: >>> Hi peeps, I am externalizing checkpoints in S3 for my flink job and I retain them on cancellation. However when I look into my S3 bucket where the checkpoints are stored there is only 1 checkpoint at any point in time . Is this the default behavior of flink where older checkpoints are deleted when the current checkpoint completes ? Here are a few screenshots. What are your thoughts on restoring an older state which is not the previous state ? List contents of bucket at time 0 Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last modified time : Wed Aug 21 22:17:23 GMT 2019 Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast modified time : Wed Aug 21 22:17:24 GMT 2019 List contents of bucket at time 1 Printing last modified times Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last modified time : Wed Aug 21 22:23:24 GMT 2019 Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified time : Wed Aug 21 22:23:24 GMT 2019 Thanks, Vishwas
Re: Externalized checkpoints
Hi, Vishwas As Zhu Zhu said, you can set "state.checkpoints.num-retained"[1] to specify the maximum number of completed checkpoints to retain. maybe you can also ref the external checkpoint cleanup type[2] config for how to clean up the retained checkpoint[2] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/checkpointing.html#related-config-options [2] https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html#retained-checkpoints Best, Congxian Zhu Zhu 于2019年8月22日周四 上午10:13写道: > Hi Vishwas, > > You can configure "state.checkpoints.num-retained" to specify the max > checkpoints to retain. > By default it is 1. > > Thanks, > Zhu Zhu > > Vishwas Siravara 于2019年8月22日周四 上午6:48写道: > >> I am also using exactly once checkpointing mode, I have a kafka source >> and sink so both support transactions which should allow for exactly once >> processing. Is this the reason why there is only one checkpoint retained ? >> >> Thanks, >> Vishwas >> >> On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara >> wrote: >> >>> Hi peeps, >>> I am externalizing checkpoints in S3 for my flink job and I retain them >>> on cancellation. However when I look into my S3 bucket where the >>> checkpoints are stored there is only 1 checkpoint at any point in time . Is >>> this the default behavior of flink where older checkpoints are deleted when >>> the current checkpoint completes ? Here are a few screenshots. What are >>> your thoughts on restoring an older state which is not the previous state ? >>> >>> List contents of bucket at time 0 >>> >>> Object Name: >>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last >>> modified time : Wed Aug 21 22:17:23 GMT 2019 >>> Object Name: >>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast modified >>> time : Wed Aug 21 22:17:24 GMT 2019 >>> >>> List contents of bucket at time 1 >>> >>> Printing last modified times >>> Object Name: >>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last >>> modified time : Wed Aug 21 22:23:24 GMT 2019 >>> Object Name: >>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified >>> time : Wed Aug 21 22:23:24 GMT 2019 >>> >>> Thanks, >>> >>> Vishwas >>> >>>
Re: Externalized checkpoints
Hi Vishwas, You can configure "state.checkpoints.num-retained" to specify the max checkpoints to retain. By default it is 1. Thanks, Zhu Zhu Vishwas Siravara 于2019年8月22日周四 上午6:48写道: > I am also using exactly once checkpointing mode, I have a kafka source and > sink so both support transactions which should allow for exactly once > processing. Is this the reason why there is only one checkpoint retained ? > > Thanks, > Vishwas > > On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara > wrote: > >> Hi peeps, >> I am externalizing checkpoints in S3 for my flink job and I retain them >> on cancellation. However when I look into my S3 bucket where the >> checkpoints are stored there is only 1 checkpoint at any point in time . Is >> this the default behavior of flink where older checkpoints are deleted when >> the current checkpoint completes ? Here are a few screenshots. What are >> your thoughts on restoring an older state which is not the previous state ? >> >> List contents of bucket at time 0 >> >> Object Name: >> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last >> modified time : Wed Aug 21 22:17:23 GMT 2019 >> Object Name: >> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast modified >> time : Wed Aug 21 22:17:24 GMT 2019 >> >> List contents of bucket at time 1 >> >> Printing last modified times >> Object Name: >> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last >> modified time : Wed Aug 21 22:23:24 GMT 2019 >> Object Name: >> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified >> time : Wed Aug 21 22:23:24 GMT 2019 >> >> Thanks, >> >> Vishwas >> >>
Re: Externalized checkpoints
I am also using exactly once checkpointing mode, I have a kafka source and sink so both support transactions which should allow for exactly once processing. Is this the reason why there is only one checkpoint retained ? Thanks, Vishwas On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara wrote: > Hi peeps, > I am externalizing checkpoints in S3 for my flink job and I retain them on > cancellation. However when I look into my S3 bucket where the checkpoints > are stored there is only 1 checkpoint at any point in time . Is this the > default behavior of flink where older checkpoints are deleted when the > current checkpoint completes ? Here are a few screenshots. What are your > thoughts on restoring an older state which is not the previous state ? > > List contents of bucket at time 0 > > Object Name: > checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last > modified time : Wed Aug 21 22:17:23 GMT 2019 > Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast > modified time : Wed Aug 21 22:17:24 GMT 2019 > > List contents of bucket at time 1 > > Printing last modified times > Object Name: > checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last > modified time : Wed Aug 21 22:23:24 GMT 2019 > Object Name: > checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified > time : Wed Aug 21 22:23:24 GMT 2019 > > Thanks, > > Vishwas > >
Re: Externalized checkpoints and metadata
Hi Juan, I think you are right and there maybe more then 3 companies implementing different solutions for this...I created a ticket to address it here https://issues.apache.org/jira/browse/FLINK-9260. Hope this could help to reduce other's redundant efforts on this...(If it could be accepted by community finally) Best Regards, Sihua Zhou On 04/26/2018 16:35,Juan Gentile<j.gent...@criteo.com> wrote: Hello all, Thank you all for your responses, I’ll take a look at your code Hao, and probably implement something similar. I’d like to ask though, so as to know what we could expect from Flink in the future, if this issue will be addressed somehow, considering that we have already 3 different companies implementing different (but similar) solutions to solve the same problem. Maybe we could think of adding this issue to here: https://cwiki.apache.org/confluence/display/FLINK/FLIP-10%3A+Unify+Checkpoints+and+Savepoints ? Thank you, Juan G. From: hao gao <hao.x@gmail.com> Date: Wednesday, 25 April 2018 at 20:25 To: Juan Gentile <j.gent...@criteo.com> Cc: "user@flink.apache.org" <user@flink.apache.org>, Oleksandr Nitavskyi <o.nitavs...@criteo.com> Subject: Re: Externalized checkpoints and metadata Hi Juan, We modified the flink code a little bit to change the flink checkpoint structure so we can easily identify which is which you can read my note or the PR https://medium.com/hadoop-noob/flink-externalized-checkpoint-eb86e693cfed https://github.com/BranchMetrics/flink/pull/6/files Hope it helps Thanks Hao 2018-04-25 6:07 GMT-07:00 Juan Gentile <j.gent...@criteo.com>: Hello, We are trying to use externalized checkpoints, using RocksDB on Hadoop hdfs. We would like to know what is the proper way to resume from a saved checkpoint as we are currently running many jobs in the same flink cluster. The problem is that when we want to restart the jobs and pass the metadata file (or directory) there is 1 file per job but they are not easily identifiable based on the name: Example /checkpoints/checkpoint_metadata-69053704a5ca /checkpoints/checkpoint_metadata-c7c016909607 We are not using savepoints and reading the documentation I see there are 2 ways to resume, 1 passing the metadata file (not possible as we have many jobs) and the other passing the directory, But by default it looks for a _metadata file which doesn’t exist. Thank you, Juan G. -- Thanks - Hao
Re: Externalized checkpoints and metadata
Hello all, Thank you all for your responses, I’ll take a look at your code Hao, and probably implement something similar. I’d like to ask though, so as to know what we could expect from Flink in the future, if this issue will be addressed somehow, considering that we have already 3 different companies implementing different (but similar) solutions to solve the same problem. Maybe we could think of adding this issue to here: https://cwiki.apache.org/confluence/display/FLINK/FLIP-10%3A+Unify+Checkpoints+and+Savepoints ? Thank you, Juan G. From: hao gao <hao.x@gmail.com> Date: Wednesday, 25 April 2018 at 20:25 To: Juan Gentile <j.gent...@criteo.com> Cc: "user@flink.apache.org" <user@flink.apache.org>, Oleksandr Nitavskyi <o.nitavs...@criteo.com> Subject: Re: Externalized checkpoints and metadata Hi Juan, We modified the flink code a little bit to change the flink checkpoint structure so we can easily identify which is which you can read my note or the PR https://medium.com/hadoop-noob/flink-externalized-checkpoint-eb86e693cfed<https://urldefense.proofpoint.com/v2/url?u=https-3A__medium.com_hadoop-2Dnoob_flink-2Dexternalized-2Dcheckpoint-2Deb86e693cfed=DwMFaQ=nxfEpP1JWHVKAq835DW4mA=z5BFHEFwsu2ghSzcXn1_8T3-VzeesIO2aULbUy2urus=wx0Ifg9GgdI4lnIayewodxapkGzFntRi49BAsQ9KkTI=ZEmQ0XemTJ7FvyxIxmJ0psQbEXaYBE44wNdWPGuW9xY=> https://github.com/BranchMetrics/flink/pull/6/files<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_BranchMetrics_flink_pull_6_files=DwMFaQ=nxfEpP1JWHVKAq835DW4mA=z5BFHEFwsu2ghSzcXn1_8T3-VzeesIO2aULbUy2urus=wx0Ifg9GgdI4lnIayewodxapkGzFntRi49BAsQ9KkTI=7PuWmJLmePMwtME15_Dp0O9W6FMe_ENwop5A9m42eO0=> Hope it helps Thanks Hao 2018-04-25 6:07 GMT-07:00 Juan Gentile <j.gent...@criteo.com<mailto:j.gent...@criteo.com>>: Hello, We are trying to use externalized checkpoints, using RocksDB on Hadoop hdfs. We would like to know what is the proper way to resume from a saved checkpoint as we are currently running many jobs in the same flink cluster. The problem is that when we want to restart the jobs and pass the metadata file (or directory) there is 1 file per job but they are not easily identifiable based on the name: Example /checkpoints/checkpoint_metadata-69053704a5ca /checkpoints/checkpoint_metadata-c7c016909607 We are not using savepoints and reading the documentation I see there are 2 ways to resume, 1 passing the metadata file (not possible as we have many jobs) and the other passing the directory, But by default it looks for a _metadata file which doesn’t exist. Thank you, Juan G. -- Thanks - Hao
Re: Externalized checkpoints and metadata
Hi Juan, We modified the flink code a little bit to change the flink checkpoint structure so we can easily identify which is which you can read my note or the PR https://medium.com/hadoop-noob/flink-externalized-checkpoint-eb86e693cfed https://github.com/BranchMetrics/flink/pull/6/files Hope it helps Thanks Hao 2018-04-25 6:07 GMT-07:00 Juan Gentile: > Hello, > > > > We are trying to use externalized checkpoints, using RocksDB on Hadoop > hdfs. > > We would like to know what is the proper way to resume from a saved > checkpoint as we are currently running many jobs in the same flink cluster. > > > > The problem is that when we want to restart the jobs and pass the metadata > file (or directory) there is 1 file per job but they are not easily > identifiable based on the name: > > Example > > */checkpoints/checkpoint_metadata-69053704a5ca* > > */checkpoints/checkpoint_metadata-c7c016909607* > > > > We are not using savepoints and reading the documentation I see there are > 2 ways to resume, 1 passing the metadata file (not possible as we have many > jobs) and the other passing the directory, > > But by default it looks for a _metadata file which doesn’t exist. > > > > Thank you, > > Juan G. > -- Thanks - Hao
Re: Externalized Checkpoints vs Periodic Checkpoints
Hi Yassine, a periodic checkpoint is checkpoint which will be triggered periodically by Flink. The checkpoint itself can have multiple properties and one of them is whether the checkpoint is externalized or not. An externalized checkpoint is a checkpoint for which Flink writes the meta information into a target directory. In contrast to that, for a non-externalized checkpoint Flink will store the checkpoint meta information only in memory. The former has the advantage that you don't lose the checkpoints if you shutdown your cluster. They behave similar to savepoints and in fact savepoints are externalized checkpoints with some more properties. At the moment, Flink's checkpoint coordinator only retains the last successfully completed checkpoint. This means that whenever a new checkpoint completes then the last completed checkpoint will be discarded. This also applies to externalized checkpoints. Cheers, Till On Wed, Feb 1, 2017 at 2:03 PM, Yassine MARZOUGUIwrote: > Hi all, > > Could someone clarify the difference between externalized checkpoints[1] > and regular periodic checkpoints[2]? > Moreover, I have a question regarding the retention of checkpoints: For > regular checkpoints, does the last checkpoint discard the previous ones? If > yes, is that the case too for the externalized checkpoints? Thank you. > > Best, > Yassine > > [1] : https://ci.apache.org/projects/flink/flink-docs- > release-1.2/setup/checkpoints.html > [2] : https://ci.apache.org/projects/flink/flink-docs- > release-1.2/dev/stream/checkpointing.html >
Re: Externalized Checkpoints vs Periodic Checkpoints
Thank you Till for the clarification, that was helpful. Best, Yassine 2017-02-02 15:31 GMT+01:00 Till Rohrmann: > Hi Yassine, > > a periodic checkpoint is checkpoint which will be triggered periodically > by Flink. The checkpoint itself can have multiple properties and one of > them is whether the checkpoint is externalized or not. > > An externalized checkpoint is a checkpoint for which Flink writes the meta > information into a target directory. In contrast to that, for a > non-externalized checkpoint Flink will store the checkpoint meta > information only in memory. The former has the advantage that you don't > lose the checkpoints if you shutdown your cluster. They behave similar to > savepoints and in fact savepoints are externalized checkpoints with some > more properties. > > At the moment, Flink's checkpoint coordinator only retains the last > successfully completed checkpoint. This means that whenever a new > checkpoint completes then the last completed checkpoint will be discarded. > This also applies to externalized checkpoints. > > Cheers, > Till > > On Wed, Feb 1, 2017 at 2:03 PM, Yassine MARZOUGUI < > y.marzou...@mindlytix.com> wrote: > >> Hi all, >> >> Could someone clarify the difference between externalized checkpoints[1] >> and regular periodic checkpoints[2]? >> Moreover, I have a question regarding the retention of checkpoints: For >> regular checkpoints, does the last checkpoint discard the previous ones? If >> yes, is that the case too for the externalized checkpoints? Thank you. >> >> Best, >> Yassine >> >> [1] : https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >> setup/checkpoints.html >> [2] : https://ci.apache.org/projects/flink/flink-docs-release-1. >> 2/dev/stream/checkpointing.html >> > >