This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git
The following commit(s) were added to refs/heads/asf-site by this push: new e11781f4d add low latency techniques blog post part2 e11781f4d is described below commit e11781f4dd965d0561d1199ae4dd13e7f596afd4 Author: Jun Qin <11677043+qinjunje...@users.noreply.github.com> AuthorDate: Tue May 17 20:13:28 2022 +0200 add low latency techniques blog post part2 --- _posts/2022-05-23-latency-part2.md | 97 +++++++++++++++++++++ img/blog/2022-05-23-latency-part2/async-io.png | Bin 0 -> 110500 bytes .../enriching-with-async-io.png | Bin 0 -> 246599 bytes img/blog/2022-05-23-latency-part2/spread-work.png | Bin 0 -> 153305 bytes 4 files changed, 97 insertions(+) diff --git a/_posts/2022-05-23-latency-part2.md b/_posts/2022-05-23-latency-part2.md new file mode 100644 index 000000000..2a96f1985 --- /dev/null +++ b/_posts/2022-05-23-latency-part2.md @@ -0,0 +1,97 @@ +--- +layout: post +title: "Getting into Low-Latency Gears with Apache Flink - Part Two" +date: 2022-05-23 00:00:00 +authors: +- Jun Qin: + name: "Jun Qin" +- Nico Kruber: + name: "Nico Kruber" +excerpt: This multi-part series of blog post presents a collection of low-latency techniques in Flink. Following with part one, Part two continues with a few more techniques that optimize latency directly. +--- + +This series of blog posts present a collection of low-latency techniques in Flink. In [part one](https://flink.apache.org/2022/05/18/latency-part1.html), we discussed the types of latency in Flink and the way we measure end-to-end latency and presented a few techniques that optimize latency directly. In this post, we will continue with a few more direct latency optimization techniques. Just like in part one, for each optimization technique, we will clarify what it is, when to use it, and [...] + + +# Direct latency optimization + +## Spread work across time + +When you use timers or do windowing in a job, timer or window firing may create load spikes due to heavy computation or state access. If the allocated resources cannot cope with these load spikes, timer or window firing will take a long time to finish. This often results in high latency. + +To avoid this situation, you should change your code to spread out the workload as much as possible such that you do not accumulate too much work to be done at a single point in time. In the case of windowing, you should consider using incremental window aggregation with `AggregateFunction` or `ReduceFunction`. In the case of timers in a `ProcessFunction`, the operations executed in the `onTimer()` method should be optimized such that the time spent there is reduced to a minimum. If you [...] + +**You can apply this optimization** if you are using timer-based processing (e.g., timers, windowing) and an efficient aggregation can be applied whenever an event arrives instead of waiting for timers to fire. + +**Keep in mind** that when you spread work across time, you should consider not only computation but also state access, especially when using RocksDB. Spreading one type of work while accumulating the other may result in higher latencies. + +[WindowingJob](https://github.com/ververica/lab-flink-latency/blob/main/src/main/java/com/ververica/lablatency/job/WindowingJob.java) already does incremental window aggregation with `AggregateFunction`. To show the latency improvement of this technique, we compared [WindowingJob](https://github.com/ververica/lab-flink-latency/blob/main/src/main/java/com/ververica/lablatency/job/WindowingJob.java) with a variant that does not do incremental aggregation, [WindowingJobNoAggregation](https: [...] + + +<center> +<img vspace="8" style="width:50%" src="{{site.baseurl}}/img/blog/2022-05-23-latency-part2/spread-work.png" /> +</center> + + +## Access external systems efficiently + +### Using async I/O + +When interacting with external systems (e.g., RDBMS, object stores, web services) in a Flink job for data enrichment, the latency in getting responses from external systems often dominates the overall latency of the job. With Flink’s [Async I/O API](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html) (e.g., `AsyncDataStream.unorderedWait()` or `AsyncDataStream.orderedWait()`), a single parallel function instance can handle many requests concurrently [...] + +<center> +<img vspace="8" style="width:50%" src="{{site.baseurl}}/img/blog/2022-05-23-latency-part2/async-io.png" /> +</center> + +**You can apply this optimization** if the client of your external system supports asynchronous requests. If it does not, you can use a thread pool of multiple clients to handle synchronous requests in parallel. You can also use a cache to speed up lookups if the data in the external system is not changing frequently. A cache, however, comes at the cost of working with outdated data. + +In this experiment, we simulated an external system that returns responses within 1 to 6 ms randomly, and we keep the external system response in a cache in our job for 1s. The results below show the comparison between two jobs: [EnrichingJobSync](https://github.com/ververica/lab-flink-latency/blob/main/src/main/java/com/ververica/lablatency/job/EnrichingJobSync.java) and [EnrichingJobAsync](https://github.com/ververica/lab-flink-latency/blob/main/src/main/java/com/ververica/lablatency/j [...] + + +<center> +<img vspace="8" style="width:50%" src="{{site.baseurl}}/img/blog/2022-05-23-latency-part2/enriching-with-async-io.png" /> +</center> + +### Using a streaming join + +If you are enriching a stream of events with an external database where the data changes frequently, and the changes can be converted to a data stream, then you have another option to use [connected streams]({{site.DOCS_BASE_URL}}flink-docs-stable/docs/dev/datastream/operators/overview/#datastreamdatastream-rarr-connectedstream) and a [CoProcessFunction]({{site.DOCS_BASE_URL}}flink-docs-stable/docs/dev/datastream/operators/process_function/#low-level-joins) to do a streaming join. This [...] + + +## Tune checkpointing + +There are two aspects in checkpointing that impact latency: checkpoint alignment time as well as checkpoint frequency and duration in case of end-to-end exactly-once with transactional sinks. + +### Reduce checkpoint alignment time + +During checkpoint alignment, operators block the event processing from the channels where checkpoint barriers have been received in order to wait for the checkpoint barriers from other channels. Longer alignment time will result in higher latencies. + +There are different ways to reduce checkpoint alignment time: + +* Improve the throughput. Any improvement in throughput helps processing the buffers sitting in front of a checkpoint barrier faster. +* Scale up or scale out. This is the same as the technique of “allocate enough resources” described in [part one](https://flink.apache.org/2022/05/18/latency-part1.html). Increased processing power helps reducing backpressure and checkpoint alignment time. +* Use unaligned checkpointing. In this case, checkpoint barriers will not wait until the data is processed but skip over and pass on to the next operator immediately. Skipped-over data, however, has to be checkpointed as well in order to be consistent. Flink can also be configured to automatically switch over from aligned to unaligned checkpointing after a certain alignment time has passed. +* Buffer less data. You can reduce the buffered data size by tuning the number of exclusive and floating buffers. With less data buffered in the network stack, the checkpoint barrier can arrive at operators quicker. However, reducing buffers has an adverse effect on throughput and is just mentioned here for completeness. Flink 1.14 improves buffer handling by introducing a feature called *buffer debloating*. Buffer debloating can dynamically adjust buffer size based on the current throug [...] + + +### Tune checkpoint duration and frequency + +If you are working with transactional sinks with exactly-once semantics, the output events are committed to external systems (e.g., Kafka) *only* upon checkpoint completion. In this case, tuning other options may not help if you do not tune checkpointing. Instead, you need to have fast and more frequent checkpointing. + +To have fast checkpointing, you need to reduce the checkpoint duration. To achieve that, you can, for example, turn on rocksdb incremental checkpointing, reduce the state stored in Flink, clean up state that is not needed anymore, do not put cache into managed state, store only necessary fields in state, optimize the serialization format, etc. You can also scale up or scale out, same as the technique of “allocate enough resources” described in [part one](https://flink.apache.org/2022/05/ [...] + +To have more frequent checkpointing, you can reduce the checkpoint interval, the minimum pause between checkpoints, or use concurrent checkpoints. But keep in mind that concurrent checkpoints introduce more runtime overhead. + +Another option is to not use exactly-once sinks but to switch to at-least-once sinks. The result of this is that you may have (correct but) duplicated output events, so this may require the downstream application that consumes the output events of your jobs to perform deduplication additionally. + + +## Process events on arrival +In a stream processing pipeline, there often exists a delay between the time an event is received and the time the event can be processed (e.g., after having seen all events up to a certain point in event time). The amount of delay may be significant for those pipelines with very low latency requirements. For example, a fraud detection job usually requires a sub-second level of latency. In this case, you could process events with [ProcessFunction]({{site.DOCS_BASE_URL}}flink-docs-stable/ [...] + +**You can apply this optimization** if your job has a sub-second level latency requirement (e.g., hundreds of milliseconds) and the reduced watermarking interval still contributes a significant part of the latency. + +**Keep in mind** that this may change your job logic considerably since you have to deal with out-of-order events by yourself. + +# Summary + +Following part one, this blog post presented a few more latency optimization techniques with a focus on direct latency optimization. In the next part, we will focus on techniques that optimize latency by increasing throughput. Stay tuned! + diff --git a/img/blog/2022-05-23-latency-part2/async-io.png b/img/blog/2022-05-23-latency-part2/async-io.png new file mode 100644 index 000000000..b1d836133 Binary files /dev/null and b/img/blog/2022-05-23-latency-part2/async-io.png differ diff --git a/img/blog/2022-05-23-latency-part2/enriching-with-async-io.png b/img/blog/2022-05-23-latency-part2/enriching-with-async-io.png new file mode 100644 index 000000000..fa11bf98c Binary files /dev/null and b/img/blog/2022-05-23-latency-part2/enriching-with-async-io.png differ diff --git a/img/blog/2022-05-23-latency-part2/spread-work.png b/img/blog/2022-05-23-latency-part2/spread-work.png new file mode 100644 index 000000000..e0c40c5d0 Binary files /dev/null and b/img/blog/2022-05-23-latency-part2/spread-work.png differ