Hi Stefan,

We prepared to  run it on local SSDs yesterday. However, as I said, the
problem just disappeared. Of course we will replace it to local SSDs, but
I'm afraid that I might be able to reproduce the same situation for both
environments to compare the difference.

Best Regards,
Tony Wei.

2018-03-09 16:59 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:

> Hi,
>
> if processing and checkpointing are stuck in RocksDB, this could indeed
> hint to a problem with your IO system. The comment about using EBS could be
> important, as it might be a bad idea from a performance point of view to
> run RocksDB on EBS; did you ever compare against running it on local SSDs?
>
> Best,
> Stefan
>
> Am 09.03.2018 um 05:08 schrieb Tony Wei <tony19920...@gmail.com>:
>
> Hi Stefan, Sihua,
>
> TLDR; after the experiment, I found that this problem might not about s3
> filesystem or network io with s3. It might caused by rocksdb and io
> performance, but I still can't recognize who caused this problem.
>
> For more specific details, I have to introduce my flink application's
> detail and what I found in the experiment. The disks I used for EC2 are
> SSD, but they are EBS.
>
> For the application detail, there is only one keyed ProcessFunction with
> ValueState with scala collection data type, which represents the counting
> by event and date
> This operator with receive two type of message: one is event message and
> the other is overwrite state message
> When operator received an event message, it would update the corresponding
> value by event and client time and emit the event to the next operator with
> the "whole" collection, that's why I used ValueState not MapState or
> ListState.
> When operator received a overwrite state message, it would overwrite the
> corresponding value in the state. This is the design that we want to replay
> the state constructed by the new rules.
> Moreover, my key is something like user cookie, and I have a timer
> callback to remove those out-dated state, for example: I'm only care about
> the state in 6 months.
>
> For the experiment, I tried to catch the first failure to find out some
> cues, so I extended the checkpoint interval to a long time and use
> savepoint. I know savepoint is not actually same as checkpoint, but I guess
> the parts of store state and upload to remote filesystem are similar.
> After some savepoints triggered, I found that asynchronous part was stuck
> in full snapshot operation and time triggers in that machine were also
> stock and blocked the operator to process element.
> I recalled that I had replayed lots of states for 60 days in 4 ~ 5 hours,
> and the first problem happened during the replay procedure. It is just a
> coincidence that the callback from those keys that I replayed happened when
> I run the experiment.
> However, when I tried to disable all checkpoints and savepoints to
> observed if the massive keys to access rocksdb get stuck, I found the
> problem was disappeared. Moreover, I roll back to the original setting that
> checkpoint got stuck. The problem didn't happened again yet.
>
> In summary, I sill can't tell where the problem happened, since the io
> performance didn't touch the limitation and the problem couldn't reproduce
> based on the same initial states.
> I decide to try out incremental checkpoint to reduce this risk. I will
> reopen this thread  with more information I can provide when the problem
> happen again. If you have any suggestion about my implementation that might
> leads to some problems or about this issue, please advice me.
> Thank you ver much for taking your time to pay attention on this issue!! =
> )
>
> p.s. the attachment is about the experiment I mentioned above. I didn't
> record the stack trace because the only difference is only Time Trigger's
> state were runnable and the operator were blocked.
>
> Best Regards,
> Tony Wei
>
>
> 2018-03-06 17:00 GMT+08:00 周思华 <summerle...@163.com>:
>
>> Hi Tony,
>>
>> I agree with you.
>>
>> Best Regards,
>>
>> Sihua Zhou
>>
>>
>> 发自网易邮箱大师
>>
>> On 03/6/2018 15:34,Tony Wei<tony19920...@gmail.com>
>> <tony19920...@gmail.com> wrote:
>>
>> Hi Sihua,
>>
>> You are right. The incremental checkpoint might release machine from high
>> cpu loading and make the bad machines recover quickly, but I was wondering
>> why the first checkpoint failed by timeout. You can see when the bad
>> machine recovered, the cpu loading for each checkpoint is not so high,
>> although there were still peeks in each checkpoint happened. I think the
>> high cpu loading that might be caused by those timeout checkpointing
>> threads is not the root cause. I will use the incremental checkpoint
>> eventually but I will decide if change my persistence filesystem after we
>> find out the root cause or stop the investigation and make the
>> conclusion in this mailing thread. What do you think?
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-03-06 15:13 GMT+08:00 周思华 <summerle...@163.com>:
>>
>>> Hi Tony,
>>>
>>> Sorry for missing the factor of cpu, I found that the "bad tm"'s cpu
>>> load is so much higher that the 'good tm', so I think maybe it also a
>>> reason that could lead to timeout. Since you were using "full checkpoint",
>>> it need to iterate all the records in the RocksDB with some `if` check,
>>> when the state is huge this is cpu costly. Let me try to explain the full
>>> checkpoint a bit more, it contains two parts.
>>>
>>> Part 1. Take snapshot of the RocksDB. (This can map to the "Checkpoint
>>> Duration (sync) " on the checkpoint detail page)
>>>
>>> Part2. Loop the records of the snapshot, along with some `if` check to 
>>> ensure
>>> that the data is sent to s3 in the order of the key group. (This can map to
>>> the "Checkpoint Duration(Async)").
>>>
>>> So part2 could be cpu costly and network costly, if the CPU load is too
>>> high, then sending data will slow down, because there are in a single loop.
>>> If cpu is the reason, this phenomenon will disappear if you use increment
>>> checkpoint, because it almost only send data to s3. In the all, for now
>>> trying out the incremental checkpoint is the best thing to do I think.
>>>
>>> Best Regards,
>>> Sihua Zhou
>>>
>>>
>>> 发自网易邮箱大师
>>>
>>> On 03/6/2018 14:45,Tony Wei<tony19920...@gmail.com>
>>> <tony19920...@gmail.com> wrote:
>>>
>>> Sent to the wrong mailing list. Forward it to the correct one.
>>>
>>> ---------- Forwarded message ----------
>>> From: Tony Wei <tony19920...@gmail.com>
>>> Date: 2018-03-06 14:43 GMT+08:00
>>> Subject: Re: checkpoint stuck with rocksdb statebackend and s3 filesystem
>>> To: 周思华 <summerle...@163.com>, Stefan Richter <
>>> s.rich...@data-artisans.com>
>>> Cc: "user-subscr...@flink.apache.org" <user-subscr...@flink.apache.org>
>>>
>>>
>>> Hi Sihua,
>>>
>>> Thanks a lot. I will try to find out the problem from machines'
>>> environment. If you and Stefan have any new suggestions or thoughts, please
>>> advise me. Thank you !
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2018-03-06 14:34 GMT+08:00 周思华 <summerle...@163.com>:
>>>
>>>> Hi Tony,
>>>>
>>>> I think the two things you mentioned can both lead to a bad network.
>>>> But from my side, I think it more likely that it is the unstable network
>>>> env that cause the poor network performance itself, because as far as
>>>> I know I can't found out the reason that the flink would slow down the
>>>> network so much (even It does, the effect should not be that so much).
>>>>
>>>> Maybe stefan could tell more about that. ;)
>>>>
>>>> Best Regards,
>>>> Sihua Zhou
>>>>
>>>> 发自网易邮箱大师
>>>>
>>>> On 03/6/2018 14:04,Tony Wei<tony19920...@gmail.com>
>>>> <tony19920...@gmail.com> wrote:
>>>>
>>>> Hi Sihua,
>>>>
>>>>
>>>>> Hi Tony,
>>>>>
>>>>> About to your question: average end to end latency of checkpoint is
>>>>> less than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, 
>>>>> it
>>>>> determined byt the max end to end latency (the slowest one), a checkpoint
>>>>> truly completed only after all task's checkpoint have completed.
>>>>>
>>>>
>>>> Sorry for my poor expression. What I mean is the average duration of
>>>> "completed" checkpoints, so I guess there are some problems that make some
>>>> subtasks of checkpoint take so long, even more than 10 mins.
>>>>
>>>>
>>>>>
>>>>> About to the problem: after a second look at the info you privode, we
>>>>> can found from the checkpoint detail picture that there is one task which
>>>>> cost 4m20s to transfer it snapshot (about 482M) to s3 and there are 4
>>>>> others tasks didn't complete the checkpoint yet. And from the
>>>>> bad_tm_pic.png vs good_tm_pic.png, we can found that on "bad tm" the
>>>>> network performance is far less than the "good tm" (-15 MB vs -50MB). So I
>>>>> guss the network is a problem, sometimes it failed to send 500M data to s3
>>>>> in 10 minutes. (maybe you need to check whether the network env is stable)
>>>>>
>>>>
>>>> That is what I concerned. Because I can't determine if checkpoint is
>>>> stuck makes network performance worse or network performance got worse
>>>> makes checkpoint stuck.
>>>> Although I provided one "bad machine" and one "good machine", that
>>>> doesn't mean bad machine is always bad and good machine is always good. See
>>>> the attachments.
>>>> All of my TMs met this problem at least once from last weekend until
>>>> now. Some machines recovered by themselves and some recovered after I
>>>> restarted them.
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>> 2018-03-06 13:41 GMT+08:00 周思华 <summerle...@163.com>:
>>>>
>>>>>
>>>>> Hi Tony,
>>>>>
>>>>> About to your question: average end to end latency of checkpoint is
>>>>> less than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, 
>>>>> it
>>>>> determined byt the max end to end latency (the slowest one), a checkpoint
>>>>> truly completed only after all task's checkpoint have completed.
>>>>>
>>>>> About to the problem: after a second look at the info you privode, we
>>>>> can found from the checkpoint detail picture that there is one task which
>>>>> cost 4m20s to transfer it snapshot (about 482M) to s3 and there are 4
>>>>> others tasks didn't complete the checkpoint yet. And from the
>>>>> bad_tm_pic.png vs good_tm_pic.png, we can found that on "bad tm" the
>>>>> network performance is far less than the "good tm" (-15 MB vs -50MB). So I
>>>>> guss the network is a problem, sometimes it failed to send 500M data to s3
>>>>> in 10 minutes. (maybe you need to check whether the network env is stable)
>>>>>
>>>>> About the solution: I think incremental checkpoint can help you a lot,
>>>>> it will only send the new data each checkpoint, but you are right if the
>>>>> increment state size is huger than 500M, it might cause the timeout 
>>>>> problem
>>>>> again (because of the bad network performance).
>>>>>
>>>>> Best Regards,
>>>>> Sihua Zhou
>>>>>
>>>>> 发自网易邮箱大师
>>>>>
>>>>> On 03/6/2018 13:02,Tony Wei<tony19920...@gmail.com>
>>>>> <tony19920...@gmail.com> wrote:
>>>>>
>>>>> Hi Sihua,
>>>>>
>>>>> Thanks for your suggestion. "incremental checkpoint" is what I will
>>>>> try out next and I know it will give a better performance. However, it
>>>>> might not solve this issue completely, because as I said, the average end
>>>>> to end latency of checkpointing is less than 1.5 mins currently, and it is
>>>>> far from my timeout configuration. I believe "incremental checkpoint" will
>>>>> reduce the latency and make this issue might occur seldom, but I can't
>>>>> promise it won't happen again if I have bigger states growth in the 
>>>>> future.
>>>>> Am I right?
>>>>>
>>>>> Best Regards,
>>>>> Tony Wei
>>>>>
>>>>> 2018-03-06 10:55 GMT+08:00 周思华 <summerle...@163.com>:
>>>>>
>>>>>> Hi Tony,
>>>>>>
>>>>>> Sorry for jump into, one thing I want to remind is that from the log
>>>>>> you provided it looks like you are using "full checkpoint", this means 
>>>>>> that
>>>>>> the state data that need to be checkpointed and transvered to s3 will 
>>>>>> grow
>>>>>> over time, and even for the first checkpoint it performance is slower 
>>>>>> that
>>>>>> incremental checkpoint (because it need to iterate all the record from 
>>>>>> the
>>>>>> rocksdb using the RocksDBMergeIterator). Maybe you can try out 
>>>>>> "incremental
>>>>>> checkpoint", it could help you got a better performance.
>>>>>>
>>>>>> Best Regards,
>>>>>> Sihua Zhou
>>>>>>
>>>>>> 发自网易邮箱大师
>>>>>>
>>>>>> On 03/6/2018 10:34,Tony Wei<tony19920...@gmail.com>
>>>>>> <tony19920...@gmail.com> wrote:
>>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>> I see. That explains why the loading of machines grew up. However, I
>>>>>> think it is not the root cause that led to these consecutive checkpoint
>>>>>> timeout. As I said in my first mail, the checkpointing progress usually
>>>>>> took 1.5 mins to upload states, and this operator and kafka consumer are
>>>>>> only two operators that have states in my pipeline. In the best case, I
>>>>>> should never encounter the timeout problem that only caused by lots of
>>>>>> pending checkpointing threads that have already timed out. Am I right?
>>>>>>
>>>>>> Since these logging and stack trace was taken after nearly 3 hours
>>>>>> from the first checkpoint timeout, I'm afraid that we couldn't actually
>>>>>> find out the root cause for the first checkpoint timeout. Because we
>>>>>> are preparing to make this pipeline go on production, I was wondering if
>>>>>> you could help me find out where the root cause happened: bad machines or
>>>>>> s3 or flink-s3-presto packages or flink checkpointing thread. It will be
>>>>>> great if we can find it out from those informations the I provided, or a
>>>>>> hypothesis based on your experience is welcome as well. The most 
>>>>>> important
>>>>>> thing is that I have to decide whether I need to change my persistence
>>>>>> filesystem or use another s3 filesystem package, because it is the last
>>>>>> thing I want to see that the checkpoint timeout happened very often.
>>>>>>
>>>>>> Thank you very much for all your advices.
>>>>>>
>>>>>> Best Regards,
>>>>>> Tony Wei
>>>>>>
>>>>>> 2018-03-06 1:07 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com
>>>>>> >:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> thanks for all the info. I had a look into the problem and opened
>>>>>>> https://issues.apache.org/jira/browse/FLINK-8871 to fix this. From
>>>>>>> your stack trace, you can see many checkpointing threads are running on
>>>>>>> your TM for checkpoints that have already timed out, and I think this
>>>>>>> cascades and slows down everything. Seems like the implementation of 
>>>>>>> some
>>>>>>> features like checkpoint timeouts and not failing tasks from 
>>>>>>> checkpointing
>>>>>>> problems overlooked that we also require to properly communicate that
>>>>>>> checkpoint cancellation to all task, which was not needed before.
>>>>>>>
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>>
>>>>>>>
>>>>>>> Am 05.03.2018 um 14:42 schrieb Tony Wei <tony19920...@gmail.com>:
>>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>> Here is my checkpointing configuration.
>>>>>>>
>>>>>>> Checkpointing Mode Exactly Once
>>>>>>> Interval 20m 0s
>>>>>>> Timeout 10m 0s
>>>>>>> Minimum Pause Between Checkpoints 0ms
>>>>>>> Maximum Concurrent Checkpoints 1
>>>>>>> Persist Checkpoints Externally Enabled (delete on cancellation)
>>>>>>> Best Regards,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>> 2018-03-05 21:30 GMT+08:00 Stefan Richter <
>>>>>>> s.rich...@data-artisans.com>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> quick question: what is your exact checkpointing configuration? In
>>>>>>>> particular, what is your value for the maximum parallel checkpoints 
>>>>>>>> and the
>>>>>>>> minimum time interval to wait between two checkpoints?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Stefan
>>>>>>>>
>>>>>>>> > Am 05.03.2018 um 06:34 schrieb Tony Wei <tony19920...@gmail.com>:
>>>>>>>> >
>>>>>>>> > Hi all,
>>>>>>>> >
>>>>>>>> > Last weekend, my flink job's checkpoint start failing because of
>>>>>>>> timeout. I have no idea what happened, but I collect some informations
>>>>>>>> about my cluster and job. Hope someone can give me advices or hints 
>>>>>>>> about
>>>>>>>> the problem that I encountered.
>>>>>>>> >
>>>>>>>> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs,
>>>>>>>> each has 4 cores. These machines are ec2 spot instances. The job's
>>>>>>>> parallelism is set as 32, using rocksdb as state backend and s3 presto 
>>>>>>>> as
>>>>>>>> checkpoint file system.
>>>>>>>> > The state's size is nearly 15gb and still grows day-by-day.
>>>>>>>> Normally, It takes 1.5 mins to finish the whole checkpoint process. The
>>>>>>>> timeout configuration is set as 10 mins.
>>>>>>>> >
>>>>>>>> > <chk_snapshot.png>
>>>>>>>> >
>>>>>>>> > As the picture shows, not each subtask of checkpoint broke caused
>>>>>>>> by timeout, but each machine has ever broken for all its subtasks 
>>>>>>>> during
>>>>>>>> last weekend. Some machines recovered by themselves and some machines
>>>>>>>> recovered after I restarted them.
>>>>>>>> >
>>>>>>>> > I record logs, stack trace and snapshot for machine's status
>>>>>>>> (CPU, IO, Network, etc.) for both good and bad machine. If there is a 
>>>>>>>> need
>>>>>>>> for more informations, please let me know. Thanks in advance.
>>>>>>>> >
>>>>>>>> > Best Regards,
>>>>>>>> > Tony Wei.
>>>>>>>> > <bad_tm_log.log><bad_tm_pic.png><bad_tm_stack.log><good_tm_l
>>>>>>>> og.log><good_tm_pic.png><good_tm_stack.log>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
> <backpressure.png><good_tm.png><bad_tm.png>
>
>
>

Reply via email to