Hi,

if processing and checkpointing are stuck in RocksDB, this could indeed hint to 
a problem with your IO system. The comment about using EBS could be important, 
as it might be a bad idea from a performance point of view to run RocksDB on 
EBS; did you ever compare against running it on local SSDs?

Best,
Stefan

> Am 09.03.2018 um 05:08 schrieb Tony Wei <tony19920...@gmail.com>:
> 
> Hi Stefan, Sihua,
> 
> TLDR; after the experiment, I found that this problem might not about s3 
> filesystem or network io with s3. It might caused by rocksdb and io 
> performance, but I still can't recognize who caused this problem.
> 
> For more specific details, I have to introduce my flink application's detail 
> and what I found in the experiment. The disks I used for EC2 are SSD, but 
> they are EBS.
> 
> For the application detail, there is only one keyed ProcessFunction with 
> ValueState with scala collection data type, which represents the counting by 
> event and date
> This operator with receive two type of message: one is event message and the 
> other is overwrite state message
> When operator received an event message, it would update the corresponding 
> value by event and client time and emit the event to the next operator with 
> the "whole" collection, that's why I used ValueState not MapState or 
> ListState.
> When operator received a overwrite state message, it would overwrite the 
> corresponding value in the state. This is the design that we want to replay 
> the state constructed by the new rules.
> Moreover, my key is something like user cookie, and I have a timer callback 
> to remove those out-dated state, for example: I'm only care about the state 
> in 6 months.
> 
> For the experiment, I tried to catch the first failure to find out some cues, 
> so I extended the checkpoint interval to a long time and use savepoint. I 
> know savepoint is not actually same as checkpoint, but I guess the parts of 
> store state and upload to remote filesystem are similar.
> After some savepoints triggered, I found that asynchronous part was stuck in 
> full snapshot operation and time triggers in that machine were also stock and 
> blocked the operator to process element.
> I recalled that I had replayed lots of states for 60 days in 4 ~ 5 hours, and 
> the first problem happened during the replay procedure. It is just a 
> coincidence that the callback from those keys that I replayed happened when I 
> run the experiment.
> However, when I tried to disable all checkpoints and savepoints to observed 
> if the massive keys to access rocksdb get stuck, I found the problem was 
> disappeared. Moreover, I roll back to the original setting that checkpoint 
> got stuck. The problem didn't happened again yet.
> 
> In summary, I sill can't tell where the problem happened, since the io 
> performance didn't touch the limitation and the problem couldn't reproduce 
> based on the same initial states.
> I decide to try out incremental checkpoint to reduce this risk. I will reopen 
> this thread  with more information I can provide when the problem happen 
> again. If you have any suggestion about my implementation that might leads to 
> some problems or about this issue, please advice me. 
> Thank you ver much for taking your time to pay attention on this issue!! = )
> 
> p.s. the attachment is about the experiment I mentioned above. I didn't 
> record the stack trace because the only difference is only Time Trigger's 
> state were runnable and the operator were blocked.
> 
> Best Regards,
> Tony Wei
> 
> 
> 2018-03-06 17:00 GMT+08:00 周思华 <summerle...@163.com 
> <mailto:summerle...@163.com>>:
> Hi Tony,
> 
> I agree with you.
> 
> Best Regards,
> 
> Sihua Zhou
> 
> 
> 发自网易邮箱大师
> 
> On 03/6/2018 15:34,Tony Wei<tony19920...@gmail.com> 
> <mailto:tony19920...@gmail.com> wrote:
> Hi Sihua,
> 
> You are right. The incremental checkpoint might release machine from high cpu 
> loading and make the bad machines recover quickly, but I was wondering why 
> the first checkpoint failed by timeout. You can see when the bad machine 
> recovered, the cpu loading for each checkpoint is not so high, although there 
> were still peeks in each checkpoint happened. I think the high cpu loading 
> that might be caused by those timeout checkpointing threads is not the root 
> cause. I will use the incremental checkpoint eventually but I will decide if 
> change my persistence filesystem after we find out the root cause or stop the 
> investigation and make the conclusion in this mailing thread. What do you 
> think?
> 
> Best Regards,
> Tony Wei
> 
> 2018-03-06 15:13 GMT+08:00 周思华 <summerle...@163.com 
> <mailto:summerle...@163.com>>:
> Hi Tony,
> 
> Sorry for missing the factor of cpu, I found that the "bad tm"'s cpu load is 
> so much higher that the 'good tm', so I think maybe it also a reason that 
> could lead to timeout. Since you were using "full checkpoint", it need to 
> iterate all the records in the RocksDB with some `if` check, when the state 
> is huge this is cpu costly. Let me try to explain the full checkpoint a bit 
> more, it contains two parts.
> 
> Part 1. Take snapshot of the RocksDB. (This can map to the "Checkpoint 
> Duration (sync) " on the checkpoint detail page)
> 
> Part2. Loop the records of the snapshot, along with some `if` check to ensure 
> that the data is sent to s3 in the order of the key group. (This can map to 
> the "Checkpoint Duration(Async)").
> 
> So part2 could be cpu costly and network costly, if the CPU load is too high, 
> then sending data will slow down, because there are in a single loop. If cpu 
> is the reason, this phenomenon will disappear if you use increment 
> checkpoint, because it almost only send data to s3. In the all, for now 
> trying out the incremental checkpoint is the best thing to do I think.
> 
> Best Regards,
> Sihua Zhou
> 
> 
> 发自网易邮箱大师
> 
> On 03/6/2018 14:45,Tony Wei<tony19920...@gmail.com> 
> <mailto:tony19920...@gmail.com> wrote:
> Sent to the wrong mailing list. Forward it to the correct one.
> 
> ---------- Forwarded message ----------
> From: Tony Wei <tony19920...@gmail.com <mailto:tony19920...@gmail.com>>
> Date: 2018-03-06 14:43 GMT+08:00
> Subject: Re: checkpoint stuck with rocksdb statebackend and s3 filesystem
> To: 周思华 <summerle...@163.com <mailto:summerle...@163.com>>, Stefan Richter 
> <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>>
> Cc: "user-subscr...@flink.apache.org 
> <mailto:user-subscr...@flink.apache.org>" <user-subscr...@flink.apache.org 
> <mailto:user-subscr...@flink.apache.org>>
> 
> 
> Hi Sihua,
> 
> Thanks a lot. I will try to find out the problem from machines' environment. 
> If you and Stefan have any new suggestions or thoughts, please advise me. 
> Thank you !
> 
> Best Regards,
> Tony Wei
> 
> 2018-03-06 14:34 GMT+08:00 周思华 <summerle...@163.com 
> <mailto:summerle...@163.com>>:
> Hi Tony,
> 
> I think the two things you mentioned can both lead to a bad network. But from 
> my side, I think it more likely that it is the unstable network env that 
> cause the poor network performance itself, because as far as I know I can't 
> found out the reason that the flink would slow down the network so much (even 
> It does, the effect should not be that so much). 
> 
> Maybe stefan could tell more about that. ;)
> 
> Best Regards,
> Sihua Zhou
> 
> 发自网易邮箱大师
> 
> On 03/6/2018 14:04,Tony Wei<tony19920...@gmail.com> 
> <mailto:tony19920...@gmail.com> wrote:
> Hi Sihua,
> 
> 
> Hi Tony,
> 
> About to your question: average end to end latency of checkpoint is less than 
> 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it determined 
> byt the max end to end latency (the slowest one), a checkpoint truly 
> completed only after all task's checkpoint have completed.
> 
> Sorry for my poor expression. What I mean is the average duration of 
> "completed" checkpoints, so I guess there are some problems that make some 
> subtasks of checkpoint take so long, even more than 10 mins.
>  
> 
> About to the problem: after a second look at the info you privode, we can 
> found from the checkpoint detail picture that there is one task which cost 
> 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 others tasks 
> didn't complete the checkpoint yet. And from the bad_tm_pic.png vs 
> good_tm_pic.png, we can found that on "bad tm" the network performance is far 
> less than the "good tm" (-15 MB vs -50MB). So I guss the network is a 
> problem, sometimes it failed to send 500M data to s3 in 10 minutes. (maybe 
> you need to check whether the network env is stable)
>  
> That is what I concerned. Because I can't determine if checkpoint is stuck 
> makes network performance worse or network performance got worse makes 
> checkpoint stuck.
> Although I provided one "bad machine" and one "good machine", that doesn't 
> mean bad machine is always bad and good machine is always good. See the 
> attachments.
> All of my TMs met this problem at least once from last weekend until now. 
> Some machines recovered by themselves and some recovered after I restarted 
> them.
> 
> Best Regards,
> Tony Wei
> 
> 2018-03-06 13:41 GMT+08:00 周思华 <summerle...@163.com 
> <mailto:summerle...@163.com>>:
> 
> Hi Tony,
> 
> About to your question: average end to end latency of checkpoint is less than 
> 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it determined 
> byt the max end to end latency (the slowest one), a checkpoint truly 
> completed only after all task's checkpoint have completed.
> 
> About to the problem: after a second look at the info you privode, we can 
> found from the checkpoint detail picture that there is one task which cost 
> 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 others tasks 
> didn't complete the checkpoint yet. And from the bad_tm_pic.png vs 
> good_tm_pic.png, we can found that on "bad tm" the network performance is far 
> less than the "good tm" (-15 MB vs -50MB). So I guss the network is a 
> problem, sometimes it failed to send 500M data to s3 in 10 minutes. (maybe 
> you need to check whether the network env is stable)
> 
> About the solution: I think incremental checkpoint can help you a lot, it 
> will only send the new data each checkpoint, but you are right if the 
> increment state size is huger than 500M, it might cause the timeout problem 
> again (because of the bad network performance).
> 
> Best Regards,
> Sihua Zhou
> 
> 发自网易邮箱大师
> 
> On 03/6/2018 13:02,Tony Wei<tony19920...@gmail.com> 
> <mailto:tony19920...@gmail.com> wrote:
> Hi Sihua,
> 
> Thanks for your suggestion. "incremental checkpoint" is what I will try out 
> next and I know it will give a better performance. However, it might not 
> solve this issue completely, because as I said, the average end to end 
> latency of checkpointing is less than 1.5 mins currently, and it is far from 
> my timeout configuration. I believe "incremental checkpoint" will reduce the 
> latency and make this issue might occur seldom, but I can't promise it won't 
> happen again if I have bigger states growth in the future. Am I right?
> 
> Best Regards,
> Tony Wei 
> 
> 2018-03-06 10:55 GMT+08:00 周思华 <summerle...@163.com 
> <mailto:summerle...@163.com>>:
> Hi Tony,
> 
> Sorry for jump into, one thing I want to remind is that from the log you 
> provided it looks like you are using "full checkpoint", this means that the 
> state data that need to be checkpointed and transvered to s3 will grow over 
> time, and even for the first checkpoint it performance is slower that 
> incremental checkpoint (because it need to iterate all the record from the 
> rocksdb using the RocksDBMergeIterator). Maybe you can try out "incremental 
> checkpoint", it could help you got a better performance.
> 
> Best Regards,
> Sihua Zhou
> 
> 发自网易邮箱大师
> 
> On 03/6/2018 10:34,Tony Wei<tony19920...@gmail.com> 
> <mailto:tony19920...@gmail.com> wrote:
> Hi Stefan,
> 
> I see. That explains why the loading of machines grew up. However, I think it 
> is not the root cause that led to these consecutive checkpoint timeout. As I 
> said in my first mail, the checkpointing progress usually took 1.5 mins to 
> upload states, and this operator and kafka consumer are only two operators 
> that have states in my pipeline. In the best case, I should never encounter 
> the timeout problem that only caused by lots of pending checkpointing threads 
> that have already timed out. Am I right?
> 
> Since these logging and stack trace was taken after nearly 3 hours from the 
> first checkpoint timeout, I'm afraid that we couldn't actually find out the 
> root cause for the first checkpoint timeout. Because we are preparing to make 
> this pipeline go on production, I was wondering if you could help me find out 
> where the root cause happened: bad machines or s3 or flink-s3-presto packages 
> or flink checkpointing thread. It will be great if we can find it out from 
> those informations the I provided, or a hypothesis based on your experience 
> is welcome as well. The most important thing is that I have to decide whether 
> I need to change my persistence filesystem or use another s3 filesystem 
> package, because it is the last thing I want to see that the checkpoint 
> timeout happened very often.
> 
> Thank you very much for all your advices.
> 
> Best Regards,
> Tony Wei
> 
> 2018-03-06 1:07 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>>:
> Hi,
> 
> thanks for all the info. I had a look into the problem and opened 
> https://issues.apache.org/jira/browse/FLINK-8871 
> <https://issues.apache.org/jira/browse/FLINK-8871> to fix this. From your 
> stack trace, you can see many checkpointing threads are running on your TM 
> for checkpoints that have already timed out, and I think this cascades and 
> slows down everything. Seems like the implementation of some features like 
> checkpoint timeouts and not failing tasks from checkpointing problems 
> overlooked that we also require to properly communicate that checkpoint 
> cancellation to all task, which was not needed before.
> 
> Best,
> Stefan
> 
> 
>> Am 05.03.2018 um 14:42 schrieb Tony Wei <tony19920...@gmail.com 
>> <mailto:tony19920...@gmail.com>>:
>> 
>> Hi Stefan,
>> 
>> Here is my checkpointing configuration.
>> 
>> Checkpointing Mode   Exactly Once
>> Interval     20m 0s
>> Timeout      10m 0s
>> Minimum Pause Between Checkpoints    0ms
>> Maximum Concurrent Checkpoints       1
>> Persist Checkpoints Externally       Enabled (delete on cancellation)
>> Best Regards,
>> Tony Wei
>> 
>> 2018-03-05 21:30 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com 
>> <mailto:s.rich...@data-artisans.com>>:
>> Hi,
>> 
>> quick question: what is your exact checkpointing configuration? In 
>> particular, what is your value for the maximum parallel checkpoints and the 
>> minimum time interval to wait between two checkpoints?
>> 
>> Best,
>> Stefan
>> 
>> > Am 05.03.2018 um 06:34 schrieb Tony Wei <tony19920...@gmail.com 
>> > <mailto:tony19920...@gmail.com>>:
>> >
>> > Hi all,
>> >
>> > Last weekend, my flink job's checkpoint start failing because of timeout. 
>> > I have no idea what happened, but I collect some informations about my 
>> > cluster and job. Hope someone can give me advices or hints about the 
>> > problem that I encountered.
>> >
>> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has 4 
>> > cores. These machines are ec2 spot instances. The job's parallelism is set 
>> > as 32, using rocksdb as state backend and s3 presto as checkpoint file 
>> > system.
>> > The state's size is nearly 15gb and still grows day-by-day. Normally, It 
>> > takes 1.5 mins to finish the whole checkpoint process. The timeout 
>> > configuration is set as 10 mins.
>> >
>> > <chk_snapshot.png>
>> >
>> > As the picture shows, not each subtask of checkpoint broke caused by 
>> > timeout, but each machine has ever broken for all its subtasks during last 
>> > weekend. Some machines recovered by themselves and some machines recovered 
>> > after I restarted them.
>> >
>> > I record logs, stack trace and snapshot for machine's status (CPU, IO, 
>> > Network, etc.) for both good and bad machine. If there is a need for more 
>> > informations, please let me know. Thanks in advance.
>> >
>> > Best Regards,
>> > Tony Wei.
>> > <bad_tm_log.log><bad_tm_pic.pn 
>> > <http://bad_tm_pic.pn/>g><bad_tm_stack.log><good_tm_log.log><good_tm_pic.png><good_tm_stack.log>
>> 
>> 
> 
> 
> 
> 
> 
> 
> 
> 
> <backpressure.png><good_tm.png><bad_tm.png>

Reply via email to