Re: ProcessFunction collect and close, when to use?

2019-11-29 Thread shuwen zhou
Thank you Jiayi, that helps a lot!

On Fri, 29 Nov 2019 at 13:44, bupt_ljy  wrote:

> Hi Shuwen,
>
>
> > When to call close() ? After every element processed? Or
> on ProcessFunction.close() ? Or never to use it?
>
>
> IMO, the #close() function is used to manage the lifecycle of #Collector
> instead of a single element. I think it should not be called in user
> function unless you have some special use cases(no use case comes to my
> mind).
>
>
> > If it's been closed already, can the collector collect() anymore data?
>
>
> No. if it’s closed, it usually means the writer is closed or maybe the
> operator is closed.
>
>
> > If processElement received a message but consider it as discard and
> does not call collect(), will this block checkpoint's barrier until next
> element was sent by collect() ?
>
>
> No.
>
>
>
> Best,
>
> Jiayi Liao
>
>
>
>  Original Message
> *Sender:* shuwen zhou
> *Recipient:* user
> *Date:* Friday, Nov 29, 2019 12:29
> *Subject:* ProcessFunction collect and close, when to use?
>
> Hi Community,
> In ProcessFunction class, ProcessElement function, there is a Collector
> that has 2 method: collect() and close(). I would like to know:
>
> 1. When to call close() ? After every element processed? Or
> on ProcessFunction.close() ? Or never to use it? If it's been closed
> already, can the collector collect() anymore data?
> 2. If processElement received a message but consider it as discard and
> does not call collect(), will this block checkpoint's barrier until next
> element was sent by collect() ?
>
>
> --
> Best Wishes,
> Shuwen Zhou
>
>

-- 
Best Wishes,
Shuwen Zhou


ProcessFunction collect and close, when to use?

2019-11-28 Thread shuwen zhou
Hi Community,
In ProcessFunction class, ProcessElement function, there is a Collector
that has 2 method: collect() and close(). I would like to know:

1. When to call close() ? After every element processed? Or
on ProcessFunction.close() ? Or never to use it? If it's been closed
already, can the collector collect() anymore data?
2. If processElement received a message but consider it as discard and does
not call collect(), will this block checkpoint's barrier until next element
was sent by collect() ?


-- 
Best Wishes,
Shuwen Zhou


Re: Cron style for checkpoint

2019-11-21 Thread shuwen zhou
Hi Yun and Congxian,
I would actually want checkpoint to avoid being triggered on a certain
time. It still remains as system mechanism just avoid being triggered at a
certain range of time.
Waiting for the checkpoint to timeout still waste CPU&disk IO resources
since it was being triggered. I would like it to avoid from being triggered
at first.
I suppose use a cron style would not break checkpoint's system mechanism.
Savepoint, on the other hand, is not incremental update, trigger a
savepoint every 10 mins will waste a lot of disk and another script is
required to remove outdated savepoint. I suppose savepoint is being used in
upgrade/restart scenario.
A cron style checkpoint time config will provide a lot flexibility. Thanks.


On Thu, 21 Nov 2019 at 16:28, Yun Tang  wrote:

> Hi Shuwen
>
>
>
> Conceptually, checkpoints in Flink behaves more like a system mechanism to
> achieve fault tolerance and transparent for users. On the other hand,
> savepoint in Flink behaves more like a user control behavior, can savepoint
> not satisfy your demands for crontab?
>
>
>
> Best
>
> Yun Tang
>
>
>
> *From: *Congxian Qiu 
> *Date: *Thursday, November 21, 2019 at 2:27 PM
> *To: *shuwen zhou 
> *Cc: *Jiayi Liao , dev , user <
> user@flink.apache.org>
> *Subject: *Re: Cron style for checkpoint
>
>
>
> Hi
>
>
>
> Currently, Flink does not support such feature, from what you describe,
> does set an appropriate timeout for checkpoint can solve your problem?
>
>
> Best,
>
> Congxian
>
>
>
>
>
> shuwen zhou  于2019年11月21日周四 下午12:06写道:
>
> Hi Jiayi,
>
> It would be great if Flink could have a user defined interface for user to
> implement to control checkpoint behavior, at least for time related
> behavior.
>
> I brought up a wish on JIRA [1], perhaps it described clearly enough.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-14884
>
>
>
>
>
> On Thu, 21 Nov 2019 at 11:40, Jiayi Liao  wrote:
>
> Hi Shuwen,
>
>
>
> As far as I know, Flink can only support checkpoint with a fixed interval.
>
>
>
> However I think the flexible mechanism of triggering checkpoint is worth
> working on, at least from my perspective. And it may not only be a cron
> style. In our business scenario, the data traffic usually reaches the peek
> of the day after 20:00, which we want to increase the interval of
> checkpoint otherwise it’ll introduce more disk and network IO.
>
>
>
> Just want to share something about this :)
>
>
>
>
>
> Best,
>
> Jiayi Liao
>
>
>
>
> At 2019-11-21 10:20:47, "shuwen zhou"  wrote:
>
> >Hi Community,
>
> >I would like to know if there is a existing function to support cron style
>
> >checkpoint?
>
> >The case is, our data traffic is huge on HH:30 for each hour. We don't wont
>
> >checkpoint to fall in that range of time. A cron like 15,45 * * * * to set
>
> >for checkpoint would be nice. If a checkpoint is already in progress when
>
> >minutes is 15 or 45, there would be a config value to trigger a new
>
> >checkpoint or pass.
>
> >
>
> >--
>
> >Best Wishes,
>
> >Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>
>
>
>
>
>
>
>
>
>
> --
>
> Best Wishes,
>
> Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>
>
>
>
>

-- 
Best Wishes,
Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>


Re: Cron style for checkpoint

2019-11-20 Thread shuwen zhou
Hi Jiayi,
It would be great if Flink could have a user defined interface for user to
implement to control checkpoint behavior, at least for time related
behavior.
I brought up a wish on JIRA [1], perhaps it described clearly enough.

[1] https://issues.apache.org/jira/browse/FLINK-14884


On Thu, 21 Nov 2019 at 11:40, Jiayi Liao  wrote:

> Hi Shuwen,
>
>
> As far as I know, Flink can only support checkpoint with a fixed interval.
>
>
> However I think the flexible mechanism of triggering checkpoint is worth
> working on, at least from my perspective. And it may not only be a cron
> style. In our business scenario, the data traffic usually reaches the peek
> of the day after 20:00, which we want to increase the interval of
> checkpoint otherwise it’ll introduce more disk and network IO.
>
>
> Just want to share something about this :)
>
>
>
> Best,
>
> Jiayi Liao
>
>
> At 2019-11-21 10:20:47, "shuwen zhou"  wrote:
> >Hi Community,
> >I would like to know if there is a existing function to support cron style
> >checkpoint?
> >The case is, our data traffic is huge on HH:30 for each hour. We don't wont
> >checkpoint to fall in that range of time. A cron like 15,45 * * * * to set
> >for checkpoint would be nice. If a checkpoint is already in progress when
> >minutes is 15 or 45, there would be a config value to trigger a new
> >checkpoint or pass.
> >
> >--
> >Best Wishes,
> >Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>
>
>
>
>
>


-- 
Best Wishes,
Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>


Cron style for checkpoint

2019-11-20 Thread shuwen zhou
Hi Community,
I would like to know if there is a existing function to support cron style
checkpoint?
The case is, our data traffic is huge on HH:30 for each hour. We don't wont
checkpoint to fall in that range of time. A cron like 15,45 * * * * to set
for checkpoint would be nice. If a checkpoint is already in progress when
minutes is 15 or 45, there would be a config value to trigger a new
checkpoint or pass.

-- 
Best Wishes,
Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>


Fwd: RocksDB state on HDFS seems not being cleanned up

2019-11-17 Thread shuwen zhou
Forward to user group again since mail server was rejecting for last time

-- Forwarded message -
From: shuwen zhou 
Date: Wed, 13 Nov 2019 at 13:33
Subject: Re: RocksDB state on HDFS seems not being cleanned up
To: Yun Tang 
Cc: user 


Hi Yun,
After my investigation, I found out the files are not orphan files, they
are still being recorded in latest checkpoint's _metadata file.
I looked through the API you mentioned
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
,
seems like the state can be accessed is limited to user defined state. I am
thinking that the outdated state might be belonged to a window reduce
state, thus I would like to access window reduce state. Seems this API
cannot provide such functionality, does it?

On Thu, 7 Nov 2019 at 18:08, Yun Tang  wrote:

> Yes, just sum all file size within checkpoint meta to get the full
> checkpoint size (this would omit some byte stream state handles, but nearly
> accurate).
>
>
>
> BTW, I think user-mail list is the better place for this email-thread,
> already sent this mail to user-mail list.
>
>
>
> Best
>
> Yun Tang
>
>
>
> *From: *shuwen zhou 
> *Date: *Thursday, November 7, 2019 at 12:02 PM
> *To: *Yun Tang 
> *Cc: *dev , Till Rohrmann 
> *Subject: *Re: RocksDB state on HDFS seems not being cleanned up
>
>
>
> Hi Yun,
>
> Thank you for your detailed explanation,It brings me a lot to research. I
> think
>
> 1. I should try reduce number of "*state.checkpoints.num-retained", *maybe
> to 3, which could decrease amount of shared folder.
>
> 2. Does Flink 1.9.0 has the possibility of orphan files? Seems the answer
> is yes, maybe. I could have use the state process API you mentioned to
> figure it out and get back to you.
>
> 3. I have a look in file 
> /flink/c344b61c456af743e4568a70b626837b/chk-172/_metadata,
> there are a lot file names
> like 
> hdfs://hadoop/flink/c344b61c456af743e4568a70b626837b/shared/e9e10c8a-6d73-48e4-9e17-45838d276b03,
> sum those file's size up is the total size of each chekpoint, am I correct?
>
> 4. My checkpoint interval is 16 minutes.
>
>
>
>
>
>
>
>
>
>
>
> On Wed, 6 Nov 2019 at 15:57, Yun Tang  wrote:
>
> Hi Shuwen
>
>
>
> Since you just have 10 “chk-“ folders as expected and when subsuming
> checkpoints, the “chk-” folder would be removed after we successfully
> removed shared state [1]. That is to say, I think you might not have too
> many orphan states files left. To ensure this, you could use state process
> API [2] to load your checkpoints and compare all the files under “shared”
> folder to see whether there existed too many orphan files. If this is true,
> we might think of the custom compaction filter future of FRocksDB.
>
>
>
> Secondly, your judgment of “20GB each checkpoint” might not be accurate
> when RocksDB incremental checkpoint is enabled, the UI showed is only the
> incremental size [3], I suggest you to count your files’s size within your
> checkpoint meta to know the accurate checkpoint size for each checkpoint.
>
>
>
> Last but not least, RocksDB’s future of compaction filter to delete
> expired data only happened during compaction [4], I’m afraid you might need
> to look up your rocksDB’s LOG file to see the frequency of compaction on
> task managers. And I think the increasing size might be related with the
> interval of your checkpoints, what the interval when you executing
> checkpoints?
>
>
>
>
>
> [1]
> https://github.com/apache/flink/blob/2ea14169a1997434d45d6f1da6dfe9acd6bd8da3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L264
> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2ea14169a1997434d45d6f1da6dfe9acd6bd8da3%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fcheckpoint%2FCompletedCheckpoint.java%23L264&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435%7C1%7C0%7C637086961380844668&sdata=qC%2FWoO7cTOONGeBw1x7CO84lO4VW33VHqdLJK63mlis%3D&reserved=0>
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Flibs%2Fstate_processor_api.html&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435%7C1%7C0%7C637086961380854680&sdata=%2B9kpGf5Te6sDG2Up5CwCNXLV9AU%2FfmXDGQh%2B%2BJh8I9E%3D&reserved=0>
>
> [3] https://issues.apache.org/jira/browse/FLINK-13390
> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissu