This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 106280e [FLINK-26164] Document watermark alignment 106280e is described below commit 106280e10a96d729943985986198b942446197d9 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Tue Feb 15 16:36:33 2022 +0100 [FLINK-26164] Document watermark alignment This closes #18783 --- .../datastream/event-time/generating_watermarks.md | 70 ++++++++++++++++++++++ .../datastream/event-time/generating_watermarks.md | 70 ++++++++++++++++++++++ 2 files changed, 140 insertions(+) diff --git a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md index bed1bc5..46294f1 100644 --- a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md +++ b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md @@ -165,6 +165,76 @@ WatermarkStrategy {{< /tab >}} {{< /tabs >}} +## Watermark alignment _`Beta`_ + +In the previous paragraph we discussed a situation when splits/partitions/shards or sources are idle +and can stall increasing watermarks. On the other side of the spectrum, a split/partition/shard or +source may process records very fast and in turn increase its watermark relatively faster than the +others. This on its own is not a problem per se. However, for downstream operators that are using +watermarks to emit some data it can actually become a problem. + +In this case, contrary to idle sources, the watermark of such downstream operator (like windowed +joins on aggregations) can progress. However, such operator might need to buffer excessive amount of +data coming from the fast inputs, as the minimal watermark from all of its inputs is held back by +the lagging input. All records emitted by the fast input will hence have to be buffered +in the said downstream operator state, which can lead into uncontrollable growth of the operator's +state. + +In order to address the issue, you can enable watermark alignment, which will make sure no +sources/splits/shards/partitions increase their watermarks too far ahead of the rest. You can enable +alignment for every source separately: + + +{{< tabs >}} +{{< tab "Java" >}} +```java +WatermarkStrategy + .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) + .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +WatermarkStrategy + .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20)) + .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)) +``` +{{< /tab >}} +{{< /tabs >}} + +{{< hint warning >}} +**Note:** You can enable watermark alignment only for [FLIP-27]({{< ref "docs/dev/datastream/sources" >}}) +sources. It does not work for legacy or if applied after the source via +[DataStream#assignTimestampsAndWatermarks](#using-watermark-strategies). +{{< /hint >}} + +When enabling the alignment, you need to tell Flink, which group should the source belong. You do +that by providing a label (e.g. `alignment-group-1`) which bind together all sources that share it. +Moreover, you have to tell the maximal drift from the current minimal watermarks across all sources +belonging to that group. The third parameter describes how often the current maximal watermark +should be updated. The downside of frequent updates is that there will be more RPC messages +travelling between TMs and the JM. + +In order to achieve the alignment Flink will pause consuming from the source/task, which generated +watermark that is too far into the future. In the meantime it will continue reading records from +other sources/tasks which can move the combined watermark forward and that way unblock the faster +one. + +{{< hint warning >}} +**Note:** As of 1.15, Flink supports aligning across tasks of the same source and/or different +sources. It does not support aligning splits/partitions/shards in the same task. + +In a case where there are e.g. two Kafka partitions that produce watermarks at different pace, that +get assigned to the same task watermark might not behave as expected. Fortunately, worst case it +should not perform worse than without alignment. + +Given the limitation above, we suggest applying watermark alignment in two situations: + +1. You have two different sources (e.g. Kafka and File) that produce watermarks at different speeds +2. You run your source with parallelism equal to the number of splits/shards/partitions, which + results in every subtask being assigned a single unit of work. + +{{< /hint >}} <a name="writing-watermarkgenerators"></a> diff --git a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md index 9d1bb83..fc20833 100644 --- a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md +++ b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md @@ -201,6 +201,76 @@ WatermarkStrategy {{< /tab >}} {{< /tabs >}} +## Watermark alignment _`Beta`_ + +In the previous paragraph we discussed a situation when splits/partitions/shards or sources are idle +and can stall increasing watermarks. On the other side of the spectrum, a split/partition/shard or +source may process records very fast and in turn increase its watermark relatively faster than the +others. This on its own is not a problem per se. However, for downstream operators that are using +watermarks to emit some data it can actually become a problem. + +In this case, contrary to idle sources, the watermark of such downstream operator (like windowed +joins on aggregations) can progress. However, such operator might need to buffer excessive amount of +data coming from the fast inputs, as the minimal watermark from all of its inputs is held back by +the lagging input. All records emitted by the fast input will hence have to be buffered +in the said downstream operator state, which can lead into uncontrollable growth of the operator's +state. + +In order to address the issue, you can enable watermark alignment, which will make sure no +sources/splits/shards/partitions increase their watermarks too far ahead of the rest. You can enable +alignment for every source separately: + + +{{< tabs >}} +{{< tab "Java" >}} +```java +WatermarkStrategy + .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) + .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +WatermarkStrategy + .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20)) + .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)) +``` +{{< /tab >}} +{{< /tabs >}} + +{{< hint warning >}} +**Note:** You can enable watermark alignment only for [FLIP-27]({{< ref "docs/dev/datastream/sources" >}}) +sources. It does not work for legacy or if applied after the source via +[DataStream#assignTimestampsAndWatermarks](#using-watermark-strategies). +{{< /hint >}} + +When enabling the alignment, you need to tell Flink, which group should the source belong. You do +that by providing a label (e.g. `alignment-group-1`) which bind together all sources that share it. +Moreover, you have to tell the maximal drift from the current minimal watermarks across all sources +belonging to that group. The third parameter describes how often the current maximal watermark +should be updated. The downside of frequent updates is that there will be more RPC messages +travelling between TMs and the JM. + +In order to achieve the alignment Flink will pause consuming from the source/task, which generated +watermark that is too far into the future. In the meantime it will continue reading records from +other sources/tasks which can move the combined watermark forward and that way unblock the faster +one. + +{{< hint warning >}} +**Note:** As of 1.15, Flink supports aligning across tasks of the same source and/or different +sources. It does not support aligning splits/partitions/shards in the same task. + +In a case where there are e.g. two Kafka partitions that produce watermarks at different pace, that +get assigned to the same task watermark might not behave as expected. Fortunately, worst case it +should not perform worse than without alignment. + +Given the limitation above, we suggest applying watermark alignment in two situations: + +1. You have two different sources (e.g. Kafka and File) that produce watermarks at different speeds +2. You run your source with parallelism equal to the number of splits/shards/partitions, which + results in every subtask being assigned a single unit of work. + +{{< /hint >}} ## Writing WatermarkGenerators