Hi Alexis,

You can read about those metrics in the documentation [1]. Long alignment
duration and start delay almost always come together. High values indicate
long checkpoint barrier propagation times through the job graph, that's
always (at least so far I haven't seen a different reason) caused by the
same thing: backpressure. Which brings me to

> There is no backpressure in any operator.

Why do you think so?

For analysing backpressure I would highly recommend upgrading to Flink 1.13.x
as it has greatly improved tooling for that [2]. Since Flink 1.10 I believe
you can use the `isBackPressured` metric. In previous versions you would
have to rely on buffer usage metrics as described here [3].

If this is indeed a problem with a backpressure, there are three things you
could do to improve checkpointing time:
a) Reduce the backpressure, either by optimising your job/code or scaling
up.
b) Reduce the amount of in-flight data. Since Flink 1.14.x, Flink can do it
automatically when buffer debloating is enabled, but the same
principle could be used to manually and statically configure cluster to
have less in-flight data. You can read about this here [4].
c) Enabled unaligned checkpoints [5].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/
[2] https://flink.apache.org/2021/07/07/backpressure.html
[3] https://flink.apache.org/2019/07/23/flink-network-stack-2.html
#network-metrics
[4]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/network_mem_tuning/#the-buffer-debloating-mechanism
[5]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints

Best,
Piotrek

czw., 21 paź 2021 o 19:00 Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> napisał(a):

> I would really appreciate more fine-grained information regarding the
> factors that can affect a checkpoint’s:
>
>
>
>    - Sync duration
>    - Async duration
>    - Alignment duration
>    - Start delay
>
>
>
> Otherwise those metrics don’t really help me know in which areas to look
> for issues.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> *Sent:* Mittwoch, 20. Oktober 2021 09:43
> *To:* Parag Somani <somanipa...@gmail.com>; Caizhi Weng <
> tsreape...@gmail.com>
> *Cc:* Flink ML <user@flink.apache.org>
> *Subject:* RE: Troubleshooting checkpoint timeout
>
>
>
> Currently the windows are 10 minutes in size with a 1-minute slide time.
> The approximate 500 event/minute throughput is already rather high for my
> use case, so I don’t expect it to be higher, but I would imagine that’s
> still pretty low.
>
>
>
> I did have some issues with storage space, and I wouldn’t be surprised if
> there is an IO bottleneck in my dev environment, but then my main question
> would be: if IO is being throttled, could that result in the high “start
> delay” times I observe? That seems to be the main slowdown, so I just want
> to be sure I’m looking in the right direction.
>
>
>
> I’d like to mention another thing about my pipeline’s structure in case
> it’s relevant, although it may be completely unrelated. I said that I
> specify the windowing properties once (windowedStream in my 1st e-mail)
> and use it twice, but it’s actually used 3 times. In addition to the 2
> ProcessWindowFunctions that end in sinks, the stream is also joined with a
> side output:
>
>
>
> openedEventsTimestamped = openedEvents
>
>         .getSideOutput(…)
>
>         .keyBy(keySelector)
>
>         .assignTimestampsAndWatermarks(watermarkStrategy)
>
>
>
> windowedStream
>
>         .process(ProcessWindowFunction3())
>
>         .keyBy(keySelector)
>
>
> .connect(DataStreamUtils.reinterpretAsKeyedStream(openedEventsTimestamped,
> keySelector))
>
>         .process(...)
>
>
>
> Could this lead to delays or alignment issues?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Parag Somani <somanipa...@gmail.com>
> *Sent:* Mittwoch, 20. Oktober 2021 09:22
> *To:* Caizhi Weng <tsreape...@gmail.com>
> *Cc:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>; Flink
> ML <user@flink.apache.org>
> *Subject:* Re: Troubleshooting checkpoint timeout
>
>
>
> I had similar problem, where i have concurrent two checkpoints were
> configured. Also, i used to save it in S3(using minio) on k8s 1.18 env.
>
>
>
> Flink service were getting restarted and timeout was happening. It got
> resolved:
>
> 1. As minio ran out of disk space, caused failure of checkpoints(this was
> the main cause).
>
> 2. Added duration/interval of checkpoint parameter to address it
>
> execution.checkpointing.max-concurrent-checkpoints and
> execution.checkpointing.min-pause
>
> Details of same at:
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#checkpointing
>
>
>
>
>
> On Wed, Oct 20, 2021 at 7:50 AM Caizhi Weng <tsreape...@gmail.com> wrote:
>
> Hi!
>
>
>
> I see you're using sliding event time windows. What's the exact value of
> windowLengthMinutes and windowSlideTimeMinutes? If windowLengthMinutes is
> large and windowSlideTimeMinutes is small then each record may be assigned
> to a large number of windows as the pipeline proceeds, thus gradually slows
> down checkpointing and finally causes a timeout.
>
>
>
> Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> 于2021年10月19
> 日周二 下午7:29写道:
>
> Hello everyone,
>
>
>
> I am doing performance tests for one of our streaming applications and,
> after increasing the throughput a bit (~500 events per minute), it has
> started failing because checkpoints cannot be completed within 10 minutes.
> The Flink cluster is not exactly under my control and is running on
> Kubernetes with version 1.11.3 and RocksDB backend.
>
>
>
> I can access the UI and logs and have confirmed:
>
>
>
>    - Logs do indicate expired checkpoints.
>    - There is no backpressure in any operator.
>    - When checkpoints do complete (seemingly at random):
>
>
>    - Size is 10-20MB.
>       - Sync and Async durations are at most 1-2 seconds.
>       - In one of the tasks, alignment takes 1-3 minutes, but start
>       delays grow to up to 5 minutes.
>
>
>    - The aforementioned task (the one with 5-minute start delay) has 8
>    sub-tasks and I see no indication of data skew. When the checkpoint times
>    out, none of the sub-tasks have acknowledged the checkpoint.
>
>
>
> The problematic task that is failing very often (and holding back
> downstream tasks) consists of the following operations:
>
>
>
> timestampedEventStream = events
>
>                 .keyBy(keySelector)
>
>                 .assignTimestampsAndWatermarks(watermarkStrategy);
>
>
>
> windowedStream =
> DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream,
> keySelector)
>
>                 .window(SlidingEventTimeWindows.of(
>
>                         Time.minutes(windowLengthMinutes),
>
>                         Time.minutes(windowSlideTimeMinutes)))
>
>                 .allowedLateness(Time.minutes(allowedLatenessMinutes));
>
>
>
> windowedStream
>
>                     .process(new ProcessWindowFunction1(config))
>
>                     // add sink
>
>
>
> windowedStream
>
>                     .process(new ProcessWindowFunction2(config))
>
>                     // add sink
>
>
>
> Both window functions are using managed state, but nothing out of the
> ordinary (as mentioned above, state size is actually very small). Do note
> that the same windowedStream is used twice.
>
>
>
> I don’t see any obvious runtime issues and I don’t think the load is
> particularly high, but maybe there’s something wrong in my pipeline
> definition? What else could cause these timeouts?
>
>
>
> Regards,
>
> Alexis.
>
>
>
>
>
> --
>
> Regards,
> Parag Surajmal Somani.
>

Reply via email to