Re: Externalized checkpoints

2019-08-25 Thread Vishwas Siravara
Got it.Thank you

On Thu, Aug 22, 2019 at 8:54 PM Congxian Qiu  wrote:

> Hi, Vishwas
>
> As Zhu Zhu said, you can set "state.checkpoints.num-retained"[1] to
> specify the maximum number of completed checkpoints to retain.
> maybe you can also ref the external checkpoint cleanup type[2] config for
> how to clean up the retained checkpoint[2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/checkpointing.html#related-config-options
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html#retained-checkpoints
> Best,
> Congxian
>
>
> Zhu Zhu  于2019年8月22日周四 上午10:13写道:
>
>> Hi Vishwas,
>>
>> You can configure "state.checkpoints.num-retained" to specify the max
>> checkpoints to retain.
>> By default it is 1.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Vishwas Siravara  于2019年8月22日周四 上午6:48写道:
>>
>>> I am also using exactly once checkpointing mode, I have a kafka source
>>> and sink so both support transactions which should allow for exactly once
>>> processing. Is this the reason why there is only one checkpoint retained ?
>>>
>>> Thanks,
>>> Vishwas
>>>
>>> On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara 
>>> wrote:
>>>
 Hi peeps,
 I am externalizing checkpoints in S3 for my flink job and I retain them
 on cancellation. However when I look into my S3 bucket where the
 checkpoints are stored there is only 1 checkpoint at any point in time . Is
 this the default behavior of flink where older checkpoints are deleted when
 the current checkpoint completes ? Here are a few screenshots. What are
 your thoughts on restoring an older state which is not the previous state ?

 List contents of bucket at time 0

 Object Name: 
 checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last
  modified time : Wed Aug 21 22:17:23 GMT 2019
 Object Name: 
 checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast modified 
 time : Wed Aug 21 22:17:24 GMT 2019

 List contents of bucket at time 1

 Printing last modified times
 Object Name: 
 checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last
  modified time : Wed Aug 21 22:23:24 GMT 2019
 Object Name: 
 checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified 
 time : Wed Aug 21 22:23:24 GMT 2019

 Thanks,

 Vishwas




Re: Externalized checkpoints

2019-08-22 Thread Congxian Qiu
Hi, Vishwas

As Zhu Zhu said, you can set "state.checkpoints.num-retained"[1] to specify
the maximum number of completed checkpoints to retain.
maybe you can also ref the external checkpoint cleanup type[2] config for
how to clean up the retained checkpoint[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/checkpointing.html#related-config-options
[2]
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html#retained-checkpoints
Best,
Congxian


Zhu Zhu  于2019年8月22日周四 上午10:13写道:

> Hi Vishwas,
>
> You can configure "state.checkpoints.num-retained" to specify the max
> checkpoints to retain.
> By default it is 1.
>
> Thanks,
> Zhu Zhu
>
> Vishwas Siravara  于2019年8月22日周四 上午6:48写道:
>
>> I am also using exactly once checkpointing mode, I have a kafka source
>> and sink so both support transactions which should allow for exactly once
>> processing. Is this the reason why there is only one checkpoint retained ?
>>
>> Thanks,
>> Vishwas
>>
>> On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara 
>> wrote:
>>
>>> Hi peeps,
>>> I am externalizing checkpoints in S3 for my flink job and I retain them
>>> on cancellation. However when I look into my S3 bucket where the
>>> checkpoints are stored there is only 1 checkpoint at any point in time . Is
>>> this the default behavior of flink where older checkpoints are deleted when
>>> the current checkpoint completes ? Here are a few screenshots. What are
>>> your thoughts on restoring an older state which is not the previous state ?
>>>
>>> List contents of bucket at time 0
>>>
>>> Object Name: 
>>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last
>>>  modified time : Wed Aug 21 22:17:23 GMT 2019
>>> Object Name: 
>>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast modified 
>>> time : Wed Aug 21 22:17:24 GMT 2019
>>>
>>> List contents of bucket at time 1
>>>
>>> Printing last modified times
>>> Object Name: 
>>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last
>>>  modified time : Wed Aug 21 22:23:24 GMT 2019
>>> Object Name: 
>>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified 
>>> time : Wed Aug 21 22:23:24 GMT 2019
>>>
>>> Thanks,
>>>
>>> Vishwas
>>>
>>>


Re: Externalized checkpoints

2019-08-21 Thread Zhu Zhu
Hi Vishwas,

You can configure "state.checkpoints.num-retained" to specify the max
checkpoints to retain.
By default it is 1.

Thanks,
Zhu Zhu

Vishwas Siravara  于2019年8月22日周四 上午6:48写道:

> I am also using exactly once checkpointing mode, I have a kafka source and
> sink so both support transactions which should allow for exactly once
> processing. Is this the reason why there is only one checkpoint retained ?
>
> Thanks,
> Vishwas
>
> On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara 
> wrote:
>
>> Hi peeps,
>> I am externalizing checkpoints in S3 for my flink job and I retain them
>> on cancellation. However when I look into my S3 bucket where the
>> checkpoints are stored there is only 1 checkpoint at any point in time . Is
>> this the default behavior of flink where older checkpoints are deleted when
>> the current checkpoint completes ? Here are a few screenshots. What are
>> your thoughts on restoring an older state which is not the previous state ?
>>
>> List contents of bucket at time 0
>>
>> Object Name: 
>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last
>>  modified time : Wed Aug 21 22:17:23 GMT 2019
>> Object Name: 
>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast modified 
>> time : Wed Aug 21 22:17:24 GMT 2019
>>
>> List contents of bucket at time 1
>>
>> Printing last modified times
>> Object Name: 
>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last
>>  modified time : Wed Aug 21 22:23:24 GMT 2019
>> Object Name: 
>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified 
>> time : Wed Aug 21 22:23:24 GMT 2019
>>
>> Thanks,
>>
>> Vishwas
>>
>>


Re: Externalized checkpoints

2019-08-21 Thread Vishwas Siravara
I am also using exactly once checkpointing mode, I have a kafka source and
sink so both support transactions which should allow for exactly once
processing. Is this the reason why there is only one checkpoint retained ?

Thanks,
Vishwas

On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara 
wrote:

> Hi peeps,
> I am externalizing checkpoints in S3 for my flink job and I retain them on
> cancellation. However when I look into my S3 bucket where the checkpoints
> are stored there is only 1 checkpoint at any point in time . Is this the
> default behavior of flink where older checkpoints are deleted when the
> current checkpoint completes ? Here are a few screenshots. What are your
> thoughts on restoring an older state which is not the previous state ?
>
> List contents of bucket at time 0
>
> Object Name: 
> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last
>  modified time : Wed Aug 21 22:17:23 GMT 2019
> Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast 
> modified time : Wed Aug 21 22:17:24 GMT 2019
>
> List contents of bucket at time 1
>
> Printing last modified times
> Object Name: 
> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last
>  modified time : Wed Aug 21 22:23:24 GMT 2019
> Object Name: 
> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified 
> time : Wed Aug 21 22:23:24 GMT 2019
>
> Thanks,
>
> Vishwas
>
>


Re: Externalized checkpoints and metadata

2018-04-26 Thread 周思华


Hi Juan,


I think you are right and there maybe more then 3 companies implementing 
different solutions for this...I created a ticket to address it here 
https://issues.apache.org/jira/browse/FLINK-9260. Hope this could help to  
reduce other's redundant efforts on this...(If it could be accepted by 
community finally)


Best Regards,
Sihua Zhou


On 04/26/2018 16:35,Juan Gentile<j.gent...@criteo.com> wrote:

Hello all,

 

Thank you all for your responses, I’ll take a look at your code Hao, and 
probably implement something similar.

I’d like to ask though, so as to know what we could expect from Flink in the 
future, if this issue will be addressed somehow, considering that we have 
already 3 different companies implementing different (but similar) solutions to 
solve the same problem.

Maybe we could think of adding this issue to here: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-10%3A+Unify+Checkpoints+and+Savepoints
 ?

 

Thank you,

Juan G.

 

From: hao gao <hao.x@gmail.com>
Date: Wednesday, 25 April 2018 at 20:25
To: Juan Gentile <j.gent...@criteo.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>, Oleksandr Nitavskyi 
<o.nitavs...@criteo.com>
Subject: Re: Externalized checkpoints and metadata

 

Hi Juan,

 

We modified the flink code a little bit to change the flink checkpoint 
structure so we can easily identify which is which

you can read my note or the PR

https://medium.com/hadoop-noob/flink-externalized-checkpoint-eb86e693cfed

https://github.com/BranchMetrics/flink/pull/6/files

Hope it helps

 

Thanks

Hao

 

2018-04-25 6:07 GMT-07:00 Juan Gentile <j.gent...@criteo.com>:

Hello,

 

We are trying to use externalized checkpoints, using RocksDB on Hadoop hdfs.

We would like to know what is the proper way to resume from a saved checkpoint 
as we are currently running many jobs in the same flink cluster.

 

The problem is that when we want to restart the jobs and pass the metadata file 
(or directory) there is 1 file per job but they are not easily identifiable 
based on the name:

Example

/checkpoints/checkpoint_metadata-69053704a5ca

/checkpoints/checkpoint_metadata-c7c016909607

 

We are not using savepoints and reading the documentation I see there are 2 
ways to resume, 1 passing the metadata file (not possible as we have many jobs) 
and the other passing the directory,

But by default it looks for a _metadata file which doesn’t exist.

 

Thank you,

Juan G.





 

--

Thanks

 - Hao 

Re: Externalized checkpoints and metadata

2018-04-26 Thread Juan Gentile
Hello all,

Thank you all for your responses, I’ll take a look at your code Hao, and 
probably implement something similar.
I’d like to ask though, so as to know what we could expect from Flink in the 
future, if this issue will be addressed somehow, considering that we have 
already 3 different companies implementing different (but similar) solutions to 
solve the same problem.
Maybe we could think of adding this issue to here: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-10%3A+Unify+Checkpoints+and+Savepoints
 ?

Thank you,
Juan G.

From: hao gao <hao.x@gmail.com>
Date: Wednesday, 25 April 2018 at 20:25
To: Juan Gentile <j.gent...@criteo.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>, Oleksandr Nitavskyi 
<o.nitavs...@criteo.com>
Subject: Re: Externalized checkpoints and metadata

Hi Juan,

We modified the flink code a little bit to change the flink checkpoint 
structure so we can easily identify which is which
you can read my note or the PR
https://medium.com/hadoop-noob/flink-externalized-checkpoint-eb86e693cfed<https://urldefense.proofpoint.com/v2/url?u=https-3A__medium.com_hadoop-2Dnoob_flink-2Dexternalized-2Dcheckpoint-2Deb86e693cfed=DwMFaQ=nxfEpP1JWHVKAq835DW4mA=z5BFHEFwsu2ghSzcXn1_8T3-VzeesIO2aULbUy2urus=wx0Ifg9GgdI4lnIayewodxapkGzFntRi49BAsQ9KkTI=ZEmQ0XemTJ7FvyxIxmJ0psQbEXaYBE44wNdWPGuW9xY=>
https://github.com/BranchMetrics/flink/pull/6/files<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_BranchMetrics_flink_pull_6_files=DwMFaQ=nxfEpP1JWHVKAq835DW4mA=z5BFHEFwsu2ghSzcXn1_8T3-VzeesIO2aULbUy2urus=wx0Ifg9GgdI4lnIayewodxapkGzFntRi49BAsQ9KkTI=7PuWmJLmePMwtME15_Dp0O9W6FMe_ENwop5A9m42eO0=>
Hope it helps

Thanks
Hao

2018-04-25 6:07 GMT-07:00 Juan Gentile 
<j.gent...@criteo.com<mailto:j.gent...@criteo.com>>:
Hello,

We are trying to use externalized checkpoints, using RocksDB on Hadoop hdfs.
We would like to know what is the proper way to resume from a saved checkpoint 
as we are currently running many jobs in the same flink cluster.

The problem is that when we want to restart the jobs and pass the metadata file 
(or directory) there is 1 file per job but they are not easily identifiable 
based on the name:
Example
/checkpoints/checkpoint_metadata-69053704a5ca
/checkpoints/checkpoint_metadata-c7c016909607

We are not using savepoints and reading the documentation I see there are 2 
ways to resume, 1 passing the metadata file (not possible as we have many jobs) 
and the other passing the directory,
But by default it looks for a _metadata file which doesn’t exist.

Thank you,
Juan G.



--
Thanks
 - Hao


Re: Externalized checkpoints and metadata

2018-04-25 Thread hao gao
Hi Juan,

We modified the flink code a little bit to change the flink checkpoint
structure so we can easily identify which is which
you can read my note or the PR
https://medium.com/hadoop-noob/flink-externalized-checkpoint-eb86e693cfed
https://github.com/BranchMetrics/flink/pull/6/files
Hope it helps

Thanks
Hao

2018-04-25 6:07 GMT-07:00 Juan Gentile :

> Hello,
>
>
>
> We are trying to use externalized checkpoints, using RocksDB on Hadoop
> hdfs.
>
> We would like to know what is the proper way to resume from a saved
> checkpoint as we are currently running many jobs in the same flink cluster.
>
>
>
> The problem is that when we want to restart the jobs and pass the metadata
> file (or directory) there is 1 file per job but they are not easily
> identifiable based on the name:
>
> Example
>
> */checkpoints/checkpoint_metadata-69053704a5ca*
>
> */checkpoints/checkpoint_metadata-c7c016909607*
>
>
>
> We are not using savepoints and reading the documentation I see there are
> 2 ways to resume, 1 passing the metadata file (not possible as we have many
> jobs) and the other passing the directory,
>
> But by default it looks for a _metadata file which doesn’t exist.
>
>
>
> Thank you,
>
> Juan G.
>



-- 
Thanks
 - Hao


Re: Externalized Checkpoints vs Periodic Checkpoints

2017-02-02 Thread Till Rohrmann
Hi Yassine,

a periodic checkpoint is checkpoint which will be triggered periodically by
Flink. The checkpoint itself can have multiple properties and one of them
is whether the checkpoint is externalized or not.

An externalized checkpoint is a checkpoint for which Flink writes the meta
information into a target directory. In contrast to that, for a
non-externalized checkpoint Flink will store the checkpoint meta
information only in memory. The former has the advantage that you don't
lose the checkpoints if you shutdown your cluster. They behave similar to
savepoints and in fact savepoints are externalized checkpoints with some
more properties.

At the moment, Flink's checkpoint coordinator only retains the last
successfully completed checkpoint. This means that whenever a new
checkpoint completes then the last completed checkpoint will be discarded.
This also applies to externalized checkpoints.

Cheers,
Till

On Wed, Feb 1, 2017 at 2:03 PM, Yassine MARZOUGUI  wrote:

> Hi all,
>
> Could someone clarify the difference between externalized checkpoints[1]
> and regular periodic checkpoints[2]?
> Moreover, I have a question regarding the retention of checkpoints: For
> regular checkpoints, does the last checkpoint discard the previous ones? If
> yes, is that the case too for the externalized checkpoints? Thank you.
>
> Best,
> Yassine
>
> [1] : https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/setup/checkpoints.html
> [2] : https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/stream/checkpointing.html
>


Re: Externalized Checkpoints vs Periodic Checkpoints

2017-02-02 Thread Yassine MARZOUGUI
Thank you Till for the clarification, that was helpful.

Best,
Yassine

2017-02-02 15:31 GMT+01:00 Till Rohrmann :

> Hi Yassine,
>
> a periodic checkpoint is checkpoint which will be triggered periodically
> by Flink. The checkpoint itself can have multiple properties and one of
> them is whether the checkpoint is externalized or not.
>
> An externalized checkpoint is a checkpoint for which Flink writes the meta
> information into a target directory. In contrast to that, for a
> non-externalized checkpoint Flink will store the checkpoint meta
> information only in memory. The former has the advantage that you don't
> lose the checkpoints if you shutdown your cluster. They behave similar to
> savepoints and in fact savepoints are externalized checkpoints with some
> more properties.
>
> At the moment, Flink's checkpoint coordinator only retains the last
> successfully completed checkpoint. This means that whenever a new
> checkpoint completes then the last completed checkpoint will be discarded.
> This also applies to externalized checkpoints.
>
> Cheers,
> Till
>
> On Wed, Feb 1, 2017 at 2:03 PM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
>> Hi all,
>>
>> Could someone clarify the difference between externalized checkpoints[1]
>> and regular periodic checkpoints[2]?
>> Moreover, I have a question regarding the retention of checkpoints: For
>> regular checkpoints, does the last checkpoint discard the previous ones? If
>> yes, is that the case too for the externalized checkpoints? Thank you.
>>
>> Best,
>> Yassine
>>
>> [1] : https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> setup/checkpoints.html
>> [2] : https://ci.apache.org/projects/flink/flink-docs-release-1.
>> 2/dev/stream/checkpointing.html
>>
>
>