Hi Piotrek,

> In other words, something (presumably a watermark) has fired more than 151 
> 200 windows at once, which is taking ~1h 10minutes to process and during this 
> time the checkpoint can not make any progress. Is this number of triggered 
> windows plausible in your scenario?

It seems plausible—there are potentially many keys (and many windows). Is there 
a way to confirm with metrics? We can add a window fire counter to the window 
operator that only gets incremented at the end of windows evaluation, in order 
to see the huge jumps in window fires. I can this benefiting other users who 
troubleshoot the problem of large number of window firing.

Best,
Mason

> On Dec 29, 2021, at 2:56 AM, Piotr Nowojski <pnowoj...@apache.org> wrote:
> 
> Hi Mason,
> 
> > and it has to finish processing this output before checkpoint can begin—is 
> > this right?
> 
> Yes. Checkpoint will be only executed once all triggered windows will be 
> fully processed. 
> 
> But from what you have posted it looks like all of that delay is coming from 
> hundreds of thousands of windows firing all at the same time. Between 20:30 
> and ~21:40 there must have been a bit more than 36 triggers/s * 60s/min * 
> 70min = 151 200triggers fired at once (or in a very short interval). In other 
> words, something (presumably a watermark) has fired more than 151 200 windows 
> at once, which is taking ~1h 10minutes to process and during this time the 
> checkpoint can not make any progress. Is this number of triggered windows 
> plausible in your scenario?
> 
> Best,
> Piotrek
> 
> 
> czw., 23 gru 2021 o 12:12 Mason Chen <mason.c...@apple.com 
> <mailto:mason.c...@apple.com>> napisał(a):
> Hi Piotr,
> 
> Thanks for the thorough response and the PR—will review later.
> 
> Clarifications:
> 1. The flat map you refer to produces at most 1 record.
> 2. The session window operator’s window process function emits at least 1 
> record. 
> 3. The 25 ms sleep is at the beginning of the window process function.
> 
> Your explanation about how records being bigger than the buffer size can 
> cause blockage makes sense to me. However, my average record size is around 
> 770 bytes coming out of the source and 960 bytes coming out of the window. 
> Also, we don’t override the default `taskmanager.memory.segment-size`. My 
> Flink job memory config is as follows:
> 
> ```
>         taskmanager.memory.jvm-metaspace.size: 512 mb
>         taskmanager.memory.jvm-overhead.max: 2Gb
>         taskmanager.memory.jvm-overhead.min: 512Mb
>         taskmanager.memory.managed.fraction: '0.4'
>         taskmanager.memory.network.fraction: '0.2'
>         taskmanager.memory.network.max: 2Gb
>         taskmanager.memory.network.min: 200Mb
>         taskmanager.memory.process.size: 16Gb
>         taskmanager.numberOfTaskSlots: '4'
> ```
> 
>>  Are you sure your job is making any progress? Are records being processed? 
>> Hasn't your job simply deadlocked on something?
> 
> To distinguish task blockage vs graceful backpressure, I have checked the 
> operator throughput metrics and have confirmed that during window task buffer 
> blockage, the window operator DOES emit records. Tasks look like they aren’t 
> doing anything but the window is emitting records.
> 
> <throughput_metrics.png>
> 
> 
> Furthermore, I created a custom trigger to wrap a metric counter for FIRED 
> counts to get a estimation of how many windows are fired at the same time. I 
> ran a separate job with the same configs—the results look as follows:
> <trigger_metrics.png>
> 
> On average, when the buffers are blocked, there are 36 FIREs per second. 
> Since each of these fires invokes the window process function, 25 ms * 36 = 
> 900 ms means we sleep almost a second cumulatively, per second—which is 
> pretty severe. Combined with the fact that the window process function can 
> emit many records, the task takes even longer to checkpoint since the 
> flatmap/kafka sink is chained with the window operator—and it has to finish 
> processing this output before checkpoint can begin—is this right? In 
> addition, when the window fires per second reduces, checkpoint is able to 
> continue and succeed.
> 
> So, I think that the surge of window firing combined with the sleep is the 
> source of the issue, which makes sense. I’m not sure how to confirm whether 
> or not the points about buffer sizes being insufficient for the window output 
> is also interplaying with this issue.
> 
> Best,
> Mason
> 
> 
>> On Dec 22, 2021, at 6:17 AM, Piotr Nowojski <pnowoj...@apache.org 
>> <mailto:pnowoj...@apache.org>> wrote:
>> 
>> Hi Mason,
>> 
>> One more question. Are you sure your job is making any progress? Are records 
>> being processed? Hasn't your job simply deadlocked on something?
>> 
>> Best,
>> Piotrek
>> 
>> śr., 22 gru 2021 o 10:02 Piotr Nowojski <pnowoj...@apache.org 
>> <mailto:pnowoj...@apache.org>> napisał(a):
>> Hi,
>> 
>> Thanks for getting back to us. This is indeed weird.
>> 
>> >> One of the unaligned checkpoints limitations is that Flink can not 
>> >> snapshot a state of an operator in the middle of processing a record.
>> >
>> >This is true for aligned checkpoints too, right?
>> 
>> In a sense. For aligned checkpoints there is a stronger limitation, that the 
>> task has to process all of the buffered records on the input before it's 
>> able to do an aligned checkpoint. For unaligned checkpoints the task has to 
>> finish fully processing only the currently processed record.
>> 
>> > 1. Why is there high start delay at the source? Isn’t this what FLIP 27 
>> > sources are designed to overcome since the need to acquire the checkpoint 
>> > lock is irrelevant? Is it a bug?
>> 
>> Kind of. You do not have to acquire checkpoint lock, as FLIP-27 sources are 
>> working in the task thread. But the task thread can not process records and 
>> do a checkpoint at the same time. FLIP-27 source will not pick up a next 
>> record from the input if there is a backpressure (that allows checkpoint to 
>> be triggered while task is back pressured), but this back pressure detection 
>> mechanism (or rather mechanism that prevents blocking waits of the task 
>> thread when there is a back pressure) is not perfect. A couple of the 
>> largests limitations are:
>> a) If your single record doesn't fit in a single network buffer, for example 
>> network buffer default size is 32KB and your record size can reach 33KB, 
>> back pressure detection will allow to process next record since there will 
>> be some buffer available, but the produced record won't fit into this single 
>> buffer and will have to blockingly wait for another buffer to be recycled 
>> (increasing start delay and/or alignment time).
>> b) If you have a flat map style operator/function in the chain, that 
>> multiplies the number of records you can hit exactly the same problem. For 
>> example, the network buffer is 32KB, record size is 330B, but you have a 
>> flat map that suddenly produces 100 records (each 330B). 330B * 100 = 33KB 
>> so again you might end up with the task being blocked as a single buffer 
>> wouldn't be enough to serialize all of those 100 records. 
>> c) The same as b), but caused by a timer/watermark triggering WindowOperator 
>> to produce lots of records.
>> 
>> > 2. When the source operator finished for checkpoint 337, why is start 
>> > delay high for the window? Barriers should have been forwarded downstream 
>> > quite quickly unless the window operator is blocking for a few hours...
>> 
>> All of those points apply actually to every task, not only FLIP-27 source 
>> task and maybe they could explain why the window/flat map task has been 
>> blocked for ~2.5h. 
>> 
>> Re 1. + 2. If your Window/Flat Map task can block for 6 hours, and your 
>> record size is sometimes exceeding network buffer size, this can cause the 
>> source task to be blocked for those 6 hours. Source task will be simply 
>> stuck waiting for a buffer to be recycled, and this will only happen once a 
>> downstream task will process one more buffer. 
>> 
>> > 3. If the window is the bottleneck, what are the various ways to confirm 
>> > this? We have metrics to measure the process function but we don’t know 
>> > how many windows are getting fired at the same time to give the overall 
>> > latency for the operator. Are there metrics or logs to see how many 
>> > windows are getting fired or how long the window operator is blocking the 
>> > window input buffers from processing?
>> 
>> In the webUI the task nodes are colored depending on the busy/backpressured 
>> time. You can clearly see that the source is fully backpressured all the 
>> time, while the window is constantly busy. I presume your function that is 
>> inducing 25ms per record sleep time is chained with the window. That 
>> confirms for me that the window task is the bottleneck. However 
>> unfortunately there is no easy way to tell how severe this back pressure and 
>> for how long those tasks are blocked. In other words, a task that is busy 
>> processing records for 1ms every 1ns and a Task that is blocked busy 
>> processing a single record for 6h will both have the same 100% Busy metric. 
>> Same goes for blocked on the back pressure (both task back pressured for 1ms 
>> every 1ns and task back pressured 1h every 1ns will have 100% back pressure 
>> metric). Moreover there is currently no way to distinguish if a task is back 
>> pressured in a graceful way, without blocking the task thread, or if it is 
>> indeed blocking the task thread (due to a), b) or c)). I have created a 
>> ticket to add some metrics to help with that [1], but it won't help you 
>> right now.
>> 
>> I. You could do some estimations on paper if anything that I have written 
>> above can theoretically happen. You should know the size of the 
>> windows/record sizes/what your Flat Map functions are doing (it seems like 
>> you have two of those chained after the WindowOperator?). From the looks of 
>> it, 25ms sleep per record, WindowOperator + Flat Map, huge state of window 
>> operators might suggest that it's possible.
>> II.  As those tasks are blocked for hours, triggering a checkpoint and 
>> collecting some stack traces can help you understand what the tasks are 
>> actually doing. But for that you would need to understand how to 
>> differentiate a blocked task, so...
>> III. ... maybe actually the most efficient way for us to help you would be 
>> if you could minimize/simplify your job, replace Kafka source with an 
>> artificial source that would be generating records, but in such a way that 
>> would still reproduce this behavior and share your code with us?
>> 
>> Best, Piotrek
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-25414 
>> <https://issues.apache.org/jira/browse/FLINK-25414>
>> 
>> 
>> wt., 21 gru 2021 o 20:10 Mason Chen <mason.c...@apple.com 
>> <mailto:mason.c...@apple.com>> napisał(a):
>> Hi Piotr,
>> 
>> These observations correspond to the 0ms alignment timeout setting.
>> 
>> The checkpoints are timeouting because the checkpoint acknowledgement is 
>> timing out. Now, we increased the timeout to 3 hours in our checkpoints and 
>> we still face errors due to checkpoint acknowledgement—the rest of the 
>> checkpoint config is still the same.
>> 
>> This is our job graph:
>> <job_graph.png>
>> To give more details about the window, we use the default event time trigger 
>> with a gap of 300 seconds and 180 allowed lateness. The window only 
>> implements the process function in which it emits 1 element.
>> 
>> Here are the screenshots of the failed checkpoints. Failures typically come 
>> in groups like this. On average, checkpoints complete in 2m 49s.
>> 
>> <failed_checkpoint_summary.png>
>> 
>> To show a few of the failed checkpoints in more detail:
>> 
>> For checkpoint 337, the source finishes checkpoint within a normal latency 
>> and the window checkpoint times out due to high start delay.
>> <checkpoint_337.png>
>> 
>> For checkpoint 338, we see very high start delay at the source and blocks 
>> the window operator from completing its checkpoint. I sorted by end to end 
>> duration for the subtasks to give an idea of the worst start delay. Start 
>> delay even show values beyond our checkpoint timeout (e.g. 4, 5, 6 hours).
>> <checkpoint_338.png>
>> 
>> 
>>> One of the unaligned checkpoints limitations is that Flink can not snapshot 
>>> a state of an operator in the middle of processing a record.
>> 
>> This is true for aligned checkpoints too, right?
>> 
>> So my questions are:
>> 
>> 1. Why is there high start delay at the source? Isn’t this what FLIP 27 
>> sources are designed to overcome since the need to acquire the checkpoint 
>> lock is irrelevant? Is it a bug?
>> 2. When the source operator finished for checkpoint 337, why is start delay 
>> high for the window? Barriers should have been forwarded downstream quite 
>> quickly unless the window operator is blocking for a few hours...
>> 3. If the window is the bottleneck, what are the various ways to confirm 
>> this? We have metrics to measure the process function but we don’t know how 
>> many windows are getting fired at the same time to give the overall latency 
>> for the operator. Are there metrics or logs to see how many windows are 
>> getting fired or how long the window operator is blocking the window input 
>> buffers from processing?
>> 
>> Thanks,
>> Mason
>> 
>> 
>>> On Dec 20, 2021, at 3:01 AM, Piotr Nowojski <pnowoj...@apache.org 
>>> <mailto:pnowoj...@apache.org>> wrote:
>>> 
>>> Hi Mason,
>>> 
>>> Those checkpoint timeouts (30 minutes) have you already observed with the 
>>> alignment timeout set to 0ms? Or as you were previously running it with 1s 
>>> alignment timeout?
>>> 
>>> If the latter, it might be because unaligned checkpoints are failing to 
>>> kick in in the first place. Setting the timeout to 0ms should solve the 
>>> problem.
>>> 
>>> If the former, have you checked why the checkpoints are timeouting? What 
>>> part of the checkpointing process is taking a long time? For example can 
>>> you post a screenshot from the WebUI of checkpoint stats for each task? The 
>>> only explanation I could think of is this sleep time that you added. 25ms 
>>> per record is really a lot. I mean really a lot. 30 minutes / 25 ms/record 
>>> = 72 000 records. One of the unaligned checkpoints limitations is that 
>>> Flink can not snapshot a state of an operator in the middle of processing a 
>>> record. In your particular case, Flink will not be able to snapshot the 
>>> state of the session window operator in the middle of the windows being 
>>> fired. If your window operator is firing a lot of windows at the same time, 
>>> or a single window is producing 72k of records (which would be an unusual 
>>> but not unimaginable amount), this could block checkpointing of the window 
>>> operator for 30 minutes due to this 25ms sleep down the stream.
>>> 
>>> Piotrek
>>> 
>>> pt., 17 gru 2021 o 19:19 Mason Chen <mason.c...@apple.com 
>>> <mailto:mason.c...@apple.com>> napisał(a):
>>> Hi Piotr,
>>> 
>>> Thanks for the link to the JIRA ticket, we actually don’t see much state 
>>> size overhead between checkpoints in aligned vs unaligned, so we will go 
>>> with your recommendation of using unaligned checkpoints with 0s alignment 
>>> timeout.
>>> 
>>> For context, we are testing unaligned checkpoints with our application with 
>>> these tasks: [kafka source, map, filter] -> keyby -> [session window] -> 
>>> [various kafka sinks]. The first task has parallelism 40 and the rest of 
>>> the tasks have parallelism 240. This is the FLIP 27 Kafka source.
>>> 
>>> We added an artificial sleep (25 ms per invocation of in process function) 
>>> the session window task to simulate backpressure; however, we still see 
>>> checkpoints failing due to task acknowledgement doesn’t complete within our 
>>> checkpoint timeout (30 minutes).
>>> 
>>> I am able to correlate that the input buffers from window and output 
>>> buffers from source being 100% usage corresponds to the checkpoint 
>>> failures. When they are not full (input can drop to as low as 60% usage and 
>>> output can drop to as low as 55% usage), the checkpoints succeed within 
>>> less than 2 ms. In all cases, it is the session window task or source task 
>>> failing to 100% acknowledge the barriers within timeout. I do see the 
>>> source task acknowledgement taking long in some of the failures (e.g. 20 
>>> minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and source is idle and 
>>> not busy at this time.
>>> 
>>> All other input buffers are low usage (mostly 0). For output buffer, the 
>>> usage is around 50% for window--everything else is near 0% all the time 
>>> except the source mentioned before (makes sense since rest are just sinks).
>>> 
>>> We are also running a parallel Flink job with the same configurations, 
>>> except with unaligned checkpoints disabled. Here we see observe the same 
>>> behavior except now some of the checkpoints are failing due to the source 
>>> task not acknowledging everything within timeout—however, most failures are 
>>> still due to session window acknowledgement.
>>> 
>>> All the data seems to points an issue with the source? Now, I don’t know 
>>> how to explain this behavior since unaligned checkpoints should overtake 
>>> records in the buffers (once seen at the input buffer, forward immediately 
>>> downstream to output buffer).
>>> 
>>> Just to confirm, this is our checkpoint configuration:
>>> ```
>>> Option
>>> Value
>>> Checkpointing Mode  Exactly Once
>>> Checkpoint Storage  FileSystemCheckpointStorage
>>> State Backend       EmbeddedRocksDBStateBackend
>>> Interval    5m 0s
>>> Timeout     30m 0s
>>> Minimum Pause Between Checkpoints   2m 0s
>>> Maximum Concurrent Checkpoints      1
>>> Unaligned Checkpoints       Enabled
>>> Persist Checkpoints Externally      Enabled (retain on cancellation)
>>> Tolerable Failed Checkpoints        10
>>> ```
>>> 
>>> Are there other metrics should I look at—why else should tasks fail 
>>> acknowledgement in unaligned mode? Is it something about the implementation 
>>> details of window function that I am not considering? My main hunch is 
>>> something to do with the source.
>>> 
>>> Best,
>>> Mason
>>> 
>>>> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski <pnowoj...@apache.org 
>>>> <mailto:pnowoj...@apache.org>> wrote:
>>>> 
>>>> Hi Mason,
>>>> 
>>>> In Flink 1.14 we have also changed the timeout behavior from checking 
>>>> against the alignment duration, to simply checking how old is the 
>>>> checkpoint barrier (so it would also account for the start delay) [1]. It 
>>>> was done in order to solve problems as you are describing. Unfortunately 
>>>> we can not backport this change to 1.13.x as it's a breaking change.
>>>> 
>>>> Anyway, from our experience I would recommend going all in with the 
>>>> unaligned checkpoints, so setting the timeout back to the default value of 
>>>> 0ms. With timeouts you are gaining very little (a tiny bit smaller state 
>>>> size if there is no backpressure - tiny bit because without backpressure, 
>>>> even with timeout set to 0ms, the amount of captured inflight data is 
>>>> basically insignificant), while in practise you slow down the checkpoint 
>>>> barriers propagation time by quite a lot.
>>>> 
>>>> Best,
>>>> Piotrek
>>>> 
>>>> [1] https://issues.apache.org/jira/browse/FLINK-23041 
>>>> <https://issues.apache.org/jira/browse/FLINK-23041>
>>>> wt., 14 gru 2021 o 22:04 Mason Chen <mas.chen6...@gmail.com 
>>>> <mailto:mas.chen6...@gmail.com>> napisał(a):
>>>> Hi all,
>>>> 
>>>> I'm using Flink 1.13 and my job is experiencing high start delay, more so 
>>>> than high alignment time. (our flip 27 kafka source is heavily 
>>>> backpressured). Since our alignment timeout is set to 1s, the unaligned 
>>>> checkpoint never triggers since alignment delay is always below the 
>>>> threshold.
>>>> 
>>>> It's seems there is only a configuration for alignment timeout but should 
>>>> there also be one for start delay timeout: 
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout
>>>>  
>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout>
>>>> 
>>>> I'm interested to know the reasoning why there isn't a timeout for start 
>>>> delay as well--was it because it was deemed too complex for the user to 
>>>> configure two parameters for unaligned checkpoints?
>>>> 
>>>> I'm aware of buffer debloating in 1.14 that could help but I'm trying to 
>>>> see how far unaligned checkpointing can take me.
>>>> 
>>>> Best,
>>>> Mason
>>> 
>> 
> 

Reply via email to