Re: ProcessFunction collect and close, when to use?
Thank you Jiayi, that helps a lot! On Fri, 29 Nov 2019 at 13:44, bupt_ljy wrote: > Hi Shuwen, > > > > When to call close() ? After every element processed? Or > on ProcessFunction.close() ? Or never to use it? > > > IMO, the #close() function is used to manage the lifecycle of #Collector > instead of a single element. I think it should not be called in user > function unless you have some special use cases(no use case comes to my > mind). > > > > If it's been closed already, can the collector collect() anymore data? > > > No. if it’s closed, it usually means the writer is closed or maybe the > operator is closed. > > > > If processElement received a message but consider it as discard and > does not call collect(), will this block checkpoint's barrier until next > element was sent by collect() ? > > > No. > > > > Best, > > Jiayi Liao > > > > Original Message > *Sender:* shuwen zhou > *Recipient:* user > *Date:* Friday, Nov 29, 2019 12:29 > *Subject:* ProcessFunction collect and close, when to use? > > Hi Community, > In ProcessFunction class, ProcessElement function, there is a Collector > that has 2 method: collect() and close(). I would like to know: > > 1. When to call close() ? After every element processed? Or > on ProcessFunction.close() ? Or never to use it? If it's been closed > already, can the collector collect() anymore data? > 2. If processElement received a message but consider it as discard and > does not call collect(), will this block checkpoint's barrier until next > element was sent by collect() ? > > > -- > Best Wishes, > Shuwen Zhou > > -- Best Wishes, Shuwen Zhou
ProcessFunction collect and close, when to use?
Hi Community, In ProcessFunction class, ProcessElement function, there is a Collector that has 2 method: collect() and close(). I would like to know: 1. When to call close() ? After every element processed? Or on ProcessFunction.close() ? Or never to use it? If it's been closed already, can the collector collect() anymore data? 2. If processElement received a message but consider it as discard and does not call collect(), will this block checkpoint's barrier until next element was sent by collect() ? -- Best Wishes, Shuwen Zhou
Re: Cron style for checkpoint
Hi Yun and Congxian, I would actually want checkpoint to avoid being triggered on a certain time. It still remains as system mechanism just avoid being triggered at a certain range of time. Waiting for the checkpoint to timeout still waste CPU&disk IO resources since it was being triggered. I would like it to avoid from being triggered at first. I suppose use a cron style would not break checkpoint's system mechanism. Savepoint, on the other hand, is not incremental update, trigger a savepoint every 10 mins will waste a lot of disk and another script is required to remove outdated savepoint. I suppose savepoint is being used in upgrade/restart scenario. A cron style checkpoint time config will provide a lot flexibility. Thanks. On Thu, 21 Nov 2019 at 16:28, Yun Tang wrote: > Hi Shuwen > > > > Conceptually, checkpoints in Flink behaves more like a system mechanism to > achieve fault tolerance and transparent for users. On the other hand, > savepoint in Flink behaves more like a user control behavior, can savepoint > not satisfy your demands for crontab? > > > > Best > > Yun Tang > > > > *From: *Congxian Qiu > *Date: *Thursday, November 21, 2019 at 2:27 PM > *To: *shuwen zhou > *Cc: *Jiayi Liao , dev , user < > user@flink.apache.org> > *Subject: *Re: Cron style for checkpoint > > > > Hi > > > > Currently, Flink does not support such feature, from what you describe, > does set an appropriate timeout for checkpoint can solve your problem? > > > Best, > > Congxian > > > > > > shuwen zhou 于2019年11月21日周四 下午12:06写道: > > Hi Jiayi, > > It would be great if Flink could have a user defined interface for user to > implement to control checkpoint behavior, at least for time related > behavior. > > I brought up a wish on JIRA [1], perhaps it described clearly enough. > > > > [1] https://issues.apache.org/jira/browse/FLINK-14884 > > > > > > On Thu, 21 Nov 2019 at 11:40, Jiayi Liao wrote: > > Hi Shuwen, > > > > As far as I know, Flink can only support checkpoint with a fixed interval. > > > > However I think the flexible mechanism of triggering checkpoint is worth > working on, at least from my perspective. And it may not only be a cron > style. In our business scenario, the data traffic usually reaches the peek > of the day after 20:00, which we want to increase the interval of > checkpoint otherwise it’ll introduce more disk and network IO. > > > > Just want to share something about this :) > > > > > > Best, > > Jiayi Liao > > > > > At 2019-11-21 10:20:47, "shuwen zhou" wrote: > > >Hi Community, > > >I would like to know if there is a existing function to support cron style > > >checkpoint? > > >The case is, our data traffic is huge on HH:30 for each hour. We don't wont > > >checkpoint to fall in that range of time. A cron like 15,45 * * * * to set > > >for checkpoint would be nice. If a checkpoint is already in progress when > > >minutes is 15 or 45, there would be a config value to trigger a new > > >checkpoint or pass. > > > > > >-- > > >Best Wishes, > > >Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/> > > > > > > > > > > -- > > Best Wishes, > > Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/> > > > > -- Best Wishes, Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>
Re: Cron style for checkpoint
Hi Jiayi, It would be great if Flink could have a user defined interface for user to implement to control checkpoint behavior, at least for time related behavior. I brought up a wish on JIRA [1], perhaps it described clearly enough. [1] https://issues.apache.org/jira/browse/FLINK-14884 On Thu, 21 Nov 2019 at 11:40, Jiayi Liao wrote: > Hi Shuwen, > > > As far as I know, Flink can only support checkpoint with a fixed interval. > > > However I think the flexible mechanism of triggering checkpoint is worth > working on, at least from my perspective. And it may not only be a cron > style. In our business scenario, the data traffic usually reaches the peek > of the day after 20:00, which we want to increase the interval of > checkpoint otherwise it’ll introduce more disk and network IO. > > > Just want to share something about this :) > > > > Best, > > Jiayi Liao > > > At 2019-11-21 10:20:47, "shuwen zhou" wrote: > >Hi Community, > >I would like to know if there is a existing function to support cron style > >checkpoint? > >The case is, our data traffic is huge on HH:30 for each hour. We don't wont > >checkpoint to fall in that range of time. A cron like 15,45 * * * * to set > >for checkpoint would be nice. If a checkpoint is already in progress when > >minutes is 15 or 45, there would be a config value to trigger a new > >checkpoint or pass. > > > >-- > >Best Wishes, > >Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/> > > > > > -- Best Wishes, Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>
Cron style for checkpoint
Hi Community, I would like to know if there is a existing function to support cron style checkpoint? The case is, our data traffic is huge on HH:30 for each hour. We don't wont checkpoint to fall in that range of time. A cron like 15,45 * * * * to set for checkpoint would be nice. If a checkpoint is already in progress when minutes is 15 or 45, there would be a config value to trigger a new checkpoint or pass. -- Best Wishes, Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>
Fwd: RocksDB state on HDFS seems not being cleanned up
Forward to user group again since mail server was rejecting for last time -- Forwarded message - From: shuwen zhou Date: Wed, 13 Nov 2019 at 13:33 Subject: Re: RocksDB state on HDFS seems not being cleanned up To: Yun Tang Cc: user Hi Yun, After my investigation, I found out the files are not orphan files, they are still being recorded in latest checkpoint's _metadata file. I looked through the API you mentioned https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html , seems like the state can be accessed is limited to user defined state. I am thinking that the outdated state might be belonged to a window reduce state, thus I would like to access window reduce state. Seems this API cannot provide such functionality, does it? On Thu, 7 Nov 2019 at 18:08, Yun Tang wrote: > Yes, just sum all file size within checkpoint meta to get the full > checkpoint size (this would omit some byte stream state handles, but nearly > accurate). > > > > BTW, I think user-mail list is the better place for this email-thread, > already sent this mail to user-mail list. > > > > Best > > Yun Tang > > > > *From: *shuwen zhou > *Date: *Thursday, November 7, 2019 at 12:02 PM > *To: *Yun Tang > *Cc: *dev , Till Rohrmann > *Subject: *Re: RocksDB state on HDFS seems not being cleanned up > > > > Hi Yun, > > Thank you for your detailed explanation,It brings me a lot to research. I > think > > 1. I should try reduce number of "*state.checkpoints.num-retained", *maybe > to 3, which could decrease amount of shared folder. > > 2. Does Flink 1.9.0 has the possibility of orphan files? Seems the answer > is yes, maybe. I could have use the state process API you mentioned to > figure it out and get back to you. > > 3. I have a look in file > /flink/c344b61c456af743e4568a70b626837b/chk-172/_metadata, > there are a lot file names > like > hdfs://hadoop/flink/c344b61c456af743e4568a70b626837b/shared/e9e10c8a-6d73-48e4-9e17-45838d276b03, > sum those file's size up is the total size of each chekpoint, am I correct? > > 4. My checkpoint interval is 16 minutes. > > > > > > > > > > > > On Wed, 6 Nov 2019 at 15:57, Yun Tang wrote: > > Hi Shuwen > > > > Since you just have 10 “chk-“ folders as expected and when subsuming > checkpoints, the “chk-” folder would be removed after we successfully > removed shared state [1]. That is to say, I think you might not have too > many orphan states files left. To ensure this, you could use state process > API [2] to load your checkpoints and compare all the files under “shared” > folder to see whether there existed too many orphan files. If this is true, > we might think of the custom compaction filter future of FRocksDB. > > > > Secondly, your judgment of “20GB each checkpoint” might not be accurate > when RocksDB incremental checkpoint is enabled, the UI showed is only the > incremental size [3], I suggest you to count your files’s size within your > checkpoint meta to know the accurate checkpoint size for each checkpoint. > > > > Last but not least, RocksDB’s future of compaction filter to delete > expired data only happened during compaction [4], I’m afraid you might need > to look up your rocksDB’s LOG file to see the frequency of compaction on > task managers. And I think the increasing size might be related with the > interval of your checkpoints, what the interval when you executing > checkpoints? > > > > > > [1] > https://github.com/apache/flink/blob/2ea14169a1997434d45d6f1da6dfe9acd6bd8da3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L264 > <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2ea14169a1997434d45d6f1da6dfe9acd6bd8da3%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fcheckpoint%2FCompletedCheckpoint.java%23L264&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435%7C1%7C0%7C637086961380844668&sdata=qC%2FWoO7cTOONGeBw1x7CO84lO4VW33VHqdLJK63mlis%3D&reserved=0> > > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Flibs%2Fstate_processor_api.html&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435%7C1%7C0%7C637086961380854680&sdata=%2B9kpGf5Te6sDG2Up5CwCNXLV9AU%2FfmXDGQh%2B%2BJh8I9E%3D&reserved=0> > > [3] https://issues.apache.org/jira/browse/FLINK-13390 > <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissu