Hi Piotrek, > In other words, something (presumably a watermark) has fired more than 151 > 200 windows at once, which is taking ~1h 10minutes to process and during this > time the checkpoint can not make any progress. Is this number of triggered > windows plausible in your scenario?
It seems plausible—there are potentially many keys (and many windows). Is there a way to confirm with metrics? We can add a window fire counter to the window operator that only gets incremented at the end of windows evaluation, in order to see the huge jumps in window fires. I can this benefiting other users who troubleshoot the problem of large number of window firing. Best, Mason > On Dec 29, 2021, at 2:56 AM, Piotr Nowojski <pnowoj...@apache.org> wrote: > > Hi Mason, > > > and it has to finish processing this output before checkpoint can begin—is > > this right? > > Yes. Checkpoint will be only executed once all triggered windows will be > fully processed. > > But from what you have posted it looks like all of that delay is coming from > hundreds of thousands of windows firing all at the same time. Between 20:30 > and ~21:40 there must have been a bit more than 36 triggers/s * 60s/min * > 70min = 151 200triggers fired at once (or in a very short interval). In other > words, something (presumably a watermark) has fired more than 151 200 windows > at once, which is taking ~1h 10minutes to process and during this time the > checkpoint can not make any progress. Is this number of triggered windows > plausible in your scenario? > > Best, > Piotrek > > > czw., 23 gru 2021 o 12:12 Mason Chen <mason.c...@apple.com > <mailto:mason.c...@apple.com>> napisał(a): > Hi Piotr, > > Thanks for the thorough response and the PR—will review later. > > Clarifications: > 1. The flat map you refer to produces at most 1 record. > 2. The session window operator’s window process function emits at least 1 > record. > 3. The 25 ms sleep is at the beginning of the window process function. > > Your explanation about how records being bigger than the buffer size can > cause blockage makes sense to me. However, my average record size is around > 770 bytes coming out of the source and 960 bytes coming out of the window. > Also, we don’t override the default `taskmanager.memory.segment-size`. My > Flink job memory config is as follows: > > ``` > taskmanager.memory.jvm-metaspace.size: 512 mb > taskmanager.memory.jvm-overhead.max: 2Gb > taskmanager.memory.jvm-overhead.min: 512Mb > taskmanager.memory.managed.fraction: '0.4' > taskmanager.memory.network.fraction: '0.2' > taskmanager.memory.network.max: 2Gb > taskmanager.memory.network.min: 200Mb > taskmanager.memory.process.size: 16Gb > taskmanager.numberOfTaskSlots: '4' > ``` > >> Are you sure your job is making any progress? Are records being processed? >> Hasn't your job simply deadlocked on something? > > To distinguish task blockage vs graceful backpressure, I have checked the > operator throughput metrics and have confirmed that during window task buffer > blockage, the window operator DOES emit records. Tasks look like they aren’t > doing anything but the window is emitting records. > > <throughput_metrics.png> > > > Furthermore, I created a custom trigger to wrap a metric counter for FIRED > counts to get a estimation of how many windows are fired at the same time. I > ran a separate job with the same configs—the results look as follows: > <trigger_metrics.png> > > On average, when the buffers are blocked, there are 36 FIREs per second. > Since each of these fires invokes the window process function, 25 ms * 36 = > 900 ms means we sleep almost a second cumulatively, per second—which is > pretty severe. Combined with the fact that the window process function can > emit many records, the task takes even longer to checkpoint since the > flatmap/kafka sink is chained with the window operator—and it has to finish > processing this output before checkpoint can begin—is this right? In > addition, when the window fires per second reduces, checkpoint is able to > continue and succeed. > > So, I think that the surge of window firing combined with the sleep is the > source of the issue, which makes sense. I’m not sure how to confirm whether > or not the points about buffer sizes being insufficient for the window output > is also interplaying with this issue. > > Best, > Mason > > >> On Dec 22, 2021, at 6:17 AM, Piotr Nowojski <pnowoj...@apache.org >> <mailto:pnowoj...@apache.org>> wrote: >> >> Hi Mason, >> >> One more question. Are you sure your job is making any progress? Are records >> being processed? Hasn't your job simply deadlocked on something? >> >> Best, >> Piotrek >> >> śr., 22 gru 2021 o 10:02 Piotr Nowojski <pnowoj...@apache.org >> <mailto:pnowoj...@apache.org>> napisał(a): >> Hi, >> >> Thanks for getting back to us. This is indeed weird. >> >> >> One of the unaligned checkpoints limitations is that Flink can not >> >> snapshot a state of an operator in the middle of processing a record. >> > >> >This is true for aligned checkpoints too, right? >> >> In a sense. For aligned checkpoints there is a stronger limitation, that the >> task has to process all of the buffered records on the input before it's >> able to do an aligned checkpoint. For unaligned checkpoints the task has to >> finish fully processing only the currently processed record. >> >> > 1. Why is there high start delay at the source? Isn’t this what FLIP 27 >> > sources are designed to overcome since the need to acquire the checkpoint >> > lock is irrelevant? Is it a bug? >> >> Kind of. You do not have to acquire checkpoint lock, as FLIP-27 sources are >> working in the task thread. But the task thread can not process records and >> do a checkpoint at the same time. FLIP-27 source will not pick up a next >> record from the input if there is a backpressure (that allows checkpoint to >> be triggered while task is back pressured), but this back pressure detection >> mechanism (or rather mechanism that prevents blocking waits of the task >> thread when there is a back pressure) is not perfect. A couple of the >> largests limitations are: >> a) If your single record doesn't fit in a single network buffer, for example >> network buffer default size is 32KB and your record size can reach 33KB, >> back pressure detection will allow to process next record since there will >> be some buffer available, but the produced record won't fit into this single >> buffer and will have to blockingly wait for another buffer to be recycled >> (increasing start delay and/or alignment time). >> b) If you have a flat map style operator/function in the chain, that >> multiplies the number of records you can hit exactly the same problem. For >> example, the network buffer is 32KB, record size is 330B, but you have a >> flat map that suddenly produces 100 records (each 330B). 330B * 100 = 33KB >> so again you might end up with the task being blocked as a single buffer >> wouldn't be enough to serialize all of those 100 records. >> c) The same as b), but caused by a timer/watermark triggering WindowOperator >> to produce lots of records. >> >> > 2. When the source operator finished for checkpoint 337, why is start >> > delay high for the window? Barriers should have been forwarded downstream >> > quite quickly unless the window operator is blocking for a few hours... >> >> All of those points apply actually to every task, not only FLIP-27 source >> task and maybe they could explain why the window/flat map task has been >> blocked for ~2.5h. >> >> Re 1. + 2. If your Window/Flat Map task can block for 6 hours, and your >> record size is sometimes exceeding network buffer size, this can cause the >> source task to be blocked for those 6 hours. Source task will be simply >> stuck waiting for a buffer to be recycled, and this will only happen once a >> downstream task will process one more buffer. >> >> > 3. If the window is the bottleneck, what are the various ways to confirm >> > this? We have metrics to measure the process function but we don’t know >> > how many windows are getting fired at the same time to give the overall >> > latency for the operator. Are there metrics or logs to see how many >> > windows are getting fired or how long the window operator is blocking the >> > window input buffers from processing? >> >> In the webUI the task nodes are colored depending on the busy/backpressured >> time. You can clearly see that the source is fully backpressured all the >> time, while the window is constantly busy. I presume your function that is >> inducing 25ms per record sleep time is chained with the window. That >> confirms for me that the window task is the bottleneck. However >> unfortunately there is no easy way to tell how severe this back pressure and >> for how long those tasks are blocked. In other words, a task that is busy >> processing records for 1ms every 1ns and a Task that is blocked busy >> processing a single record for 6h will both have the same 100% Busy metric. >> Same goes for blocked on the back pressure (both task back pressured for 1ms >> every 1ns and task back pressured 1h every 1ns will have 100% back pressure >> metric). Moreover there is currently no way to distinguish if a task is back >> pressured in a graceful way, without blocking the task thread, or if it is >> indeed blocking the task thread (due to a), b) or c)). I have created a >> ticket to add some metrics to help with that [1], but it won't help you >> right now. >> >> I. You could do some estimations on paper if anything that I have written >> above can theoretically happen. You should know the size of the >> windows/record sizes/what your Flat Map functions are doing (it seems like >> you have two of those chained after the WindowOperator?). From the looks of >> it, 25ms sleep per record, WindowOperator + Flat Map, huge state of window >> operators might suggest that it's possible. >> II. As those tasks are blocked for hours, triggering a checkpoint and >> collecting some stack traces can help you understand what the tasks are >> actually doing. But for that you would need to understand how to >> differentiate a blocked task, so... >> III. ... maybe actually the most efficient way for us to help you would be >> if you could minimize/simplify your job, replace Kafka source with an >> artificial source that would be generating records, but in such a way that >> would still reproduce this behavior and share your code with us? >> >> Best, Piotrek >> >> [1] https://issues.apache.org/jira/browse/FLINK-25414 >> <https://issues.apache.org/jira/browse/FLINK-25414> >> >> >> wt., 21 gru 2021 o 20:10 Mason Chen <mason.c...@apple.com >> <mailto:mason.c...@apple.com>> napisał(a): >> Hi Piotr, >> >> These observations correspond to the 0ms alignment timeout setting. >> >> The checkpoints are timeouting because the checkpoint acknowledgement is >> timing out. Now, we increased the timeout to 3 hours in our checkpoints and >> we still face errors due to checkpoint acknowledgement—the rest of the >> checkpoint config is still the same. >> >> This is our job graph: >> <job_graph.png> >> To give more details about the window, we use the default event time trigger >> with a gap of 300 seconds and 180 allowed lateness. The window only >> implements the process function in which it emits 1 element. >> >> Here are the screenshots of the failed checkpoints. Failures typically come >> in groups like this. On average, checkpoints complete in 2m 49s. >> >> <failed_checkpoint_summary.png> >> >> To show a few of the failed checkpoints in more detail: >> >> For checkpoint 337, the source finishes checkpoint within a normal latency >> and the window checkpoint times out due to high start delay. >> <checkpoint_337.png> >> >> For checkpoint 338, we see very high start delay at the source and blocks >> the window operator from completing its checkpoint. I sorted by end to end >> duration for the subtasks to give an idea of the worst start delay. Start >> delay even show values beyond our checkpoint timeout (e.g. 4, 5, 6 hours). >> <checkpoint_338.png> >> >> >>> One of the unaligned checkpoints limitations is that Flink can not snapshot >>> a state of an operator in the middle of processing a record. >> >> This is true for aligned checkpoints too, right? >> >> So my questions are: >> >> 1. Why is there high start delay at the source? Isn’t this what FLIP 27 >> sources are designed to overcome since the need to acquire the checkpoint >> lock is irrelevant? Is it a bug? >> 2. When the source operator finished for checkpoint 337, why is start delay >> high for the window? Barriers should have been forwarded downstream quite >> quickly unless the window operator is blocking for a few hours... >> 3. If the window is the bottleneck, what are the various ways to confirm >> this? We have metrics to measure the process function but we don’t know how >> many windows are getting fired at the same time to give the overall latency >> for the operator. Are there metrics or logs to see how many windows are >> getting fired or how long the window operator is blocking the window input >> buffers from processing? >> >> Thanks, >> Mason >> >> >>> On Dec 20, 2021, at 3:01 AM, Piotr Nowojski <pnowoj...@apache.org >>> <mailto:pnowoj...@apache.org>> wrote: >>> >>> Hi Mason, >>> >>> Those checkpoint timeouts (30 minutes) have you already observed with the >>> alignment timeout set to 0ms? Or as you were previously running it with 1s >>> alignment timeout? >>> >>> If the latter, it might be because unaligned checkpoints are failing to >>> kick in in the first place. Setting the timeout to 0ms should solve the >>> problem. >>> >>> If the former, have you checked why the checkpoints are timeouting? What >>> part of the checkpointing process is taking a long time? For example can >>> you post a screenshot from the WebUI of checkpoint stats for each task? The >>> only explanation I could think of is this sleep time that you added. 25ms >>> per record is really a lot. I mean really a lot. 30 minutes / 25 ms/record >>> = 72 000 records. One of the unaligned checkpoints limitations is that >>> Flink can not snapshot a state of an operator in the middle of processing a >>> record. In your particular case, Flink will not be able to snapshot the >>> state of the session window operator in the middle of the windows being >>> fired. If your window operator is firing a lot of windows at the same time, >>> or a single window is producing 72k of records (which would be an unusual >>> but not unimaginable amount), this could block checkpointing of the window >>> operator for 30 minutes due to this 25ms sleep down the stream. >>> >>> Piotrek >>> >>> pt., 17 gru 2021 o 19:19 Mason Chen <mason.c...@apple.com >>> <mailto:mason.c...@apple.com>> napisał(a): >>> Hi Piotr, >>> >>> Thanks for the link to the JIRA ticket, we actually don’t see much state >>> size overhead between checkpoints in aligned vs unaligned, so we will go >>> with your recommendation of using unaligned checkpoints with 0s alignment >>> timeout. >>> >>> For context, we are testing unaligned checkpoints with our application with >>> these tasks: [kafka source, map, filter] -> keyby -> [session window] -> >>> [various kafka sinks]. The first task has parallelism 40 and the rest of >>> the tasks have parallelism 240. This is the FLIP 27 Kafka source. >>> >>> We added an artificial sleep (25 ms per invocation of in process function) >>> the session window task to simulate backpressure; however, we still see >>> checkpoints failing due to task acknowledgement doesn’t complete within our >>> checkpoint timeout (30 minutes). >>> >>> I am able to correlate that the input buffers from window and output >>> buffers from source being 100% usage corresponds to the checkpoint >>> failures. When they are not full (input can drop to as low as 60% usage and >>> output can drop to as low as 55% usage), the checkpoints succeed within >>> less than 2 ms. In all cases, it is the session window task or source task >>> failing to 100% acknowledge the barriers within timeout. I do see the >>> source task acknowledgement taking long in some of the failures (e.g. 20 >>> minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and source is idle and >>> not busy at this time. >>> >>> All other input buffers are low usage (mostly 0). For output buffer, the >>> usage is around 50% for window--everything else is near 0% all the time >>> except the source mentioned before (makes sense since rest are just sinks). >>> >>> We are also running a parallel Flink job with the same configurations, >>> except with unaligned checkpoints disabled. Here we see observe the same >>> behavior except now some of the checkpoints are failing due to the source >>> task not acknowledging everything within timeout—however, most failures are >>> still due to session window acknowledgement. >>> >>> All the data seems to points an issue with the source? Now, I don’t know >>> how to explain this behavior since unaligned checkpoints should overtake >>> records in the buffers (once seen at the input buffer, forward immediately >>> downstream to output buffer). >>> >>> Just to confirm, this is our checkpoint configuration: >>> ``` >>> Option >>> Value >>> Checkpointing Mode Exactly Once >>> Checkpoint Storage FileSystemCheckpointStorage >>> State Backend EmbeddedRocksDBStateBackend >>> Interval 5m 0s >>> Timeout 30m 0s >>> Minimum Pause Between Checkpoints 2m 0s >>> Maximum Concurrent Checkpoints 1 >>> Unaligned Checkpoints Enabled >>> Persist Checkpoints Externally Enabled (retain on cancellation) >>> Tolerable Failed Checkpoints 10 >>> ``` >>> >>> Are there other metrics should I look at—why else should tasks fail >>> acknowledgement in unaligned mode? Is it something about the implementation >>> details of window function that I am not considering? My main hunch is >>> something to do with the source. >>> >>> Best, >>> Mason >>> >>>> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski <pnowoj...@apache.org >>>> <mailto:pnowoj...@apache.org>> wrote: >>>> >>>> Hi Mason, >>>> >>>> In Flink 1.14 we have also changed the timeout behavior from checking >>>> against the alignment duration, to simply checking how old is the >>>> checkpoint barrier (so it would also account for the start delay) [1]. It >>>> was done in order to solve problems as you are describing. Unfortunately >>>> we can not backport this change to 1.13.x as it's a breaking change. >>>> >>>> Anyway, from our experience I would recommend going all in with the >>>> unaligned checkpoints, so setting the timeout back to the default value of >>>> 0ms. With timeouts you are gaining very little (a tiny bit smaller state >>>> size if there is no backpressure - tiny bit because without backpressure, >>>> even with timeout set to 0ms, the amount of captured inflight data is >>>> basically insignificant), while in practise you slow down the checkpoint >>>> barriers propagation time by quite a lot. >>>> >>>> Best, >>>> Piotrek >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-23041 >>>> <https://issues.apache.org/jira/browse/FLINK-23041> >>>> wt., 14 gru 2021 o 22:04 Mason Chen <mas.chen6...@gmail.com >>>> <mailto:mas.chen6...@gmail.com>> napisał(a): >>>> Hi all, >>>> >>>> I'm using Flink 1.13 and my job is experiencing high start delay, more so >>>> than high alignment time. (our flip 27 kafka source is heavily >>>> backpressured). Since our alignment timeout is set to 1s, the unaligned >>>> checkpoint never triggers since alignment delay is always below the >>>> threshold. >>>> >>>> It's seems there is only a configuration for alignment timeout but should >>>> there also be one for start delay timeout: >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout >>>> >>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout> >>>> >>>> I'm interested to know the reasoning why there isn't a timeout for start >>>> delay as well--was it because it was deemed too complex for the user to >>>> configure two parameters for unaligned checkpoints? >>>> >>>> I'm aware of buffer debloating in 1.14 that could help but I'm trying to >>>> see how far unaligned checkpointing can take me. >>>> >>>> Best, >>>> Mason >>> >> >