This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 106280e  [FLINK-26164] Document watermark alignment
106280e is described below

commit 106280e10a96d729943985986198b942446197d9
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Tue Feb 15 16:36:33 2022 +0100

    [FLINK-26164] Document watermark alignment
    
    This closes #18783
---
 .../datastream/event-time/generating_watermarks.md | 70 ++++++++++++++++++++++
 .../datastream/event-time/generating_watermarks.md | 70 ++++++++++++++++++++++
 2 files changed, 140 insertions(+)

diff --git 
a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md 
b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
index bed1bc5..46294f1 100644
--- a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
+++ b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
@@ -165,6 +165,76 @@ WatermarkStrategy
 {{< /tab >}}
 {{< /tabs >}}
 
+## Watermark alignment _`Beta`_
+
+In the previous paragraph we discussed a situation when 
splits/partitions/shards or sources are idle
+and can stall increasing watermarks. On the other side of the spectrum, a 
split/partition/shard or
+source may process records very fast and in turn increase its watermark 
relatively faster than the
+others. This on its own is not a problem per se. However, for downstream 
operators that are using
+watermarks to emit some data it can actually become a problem.
+
+In this case, contrary to idle sources, the watermark of such downstream 
operator (like windowed
+joins on aggregations) can progress. However, such operator might need to 
buffer excessive amount of
+data coming from the fast inputs, as the minimal watermark from all of its 
inputs is held back by
+the lagging input. All records emitted by the fast input will hence have to be 
buffered
+in the said downstream operator state, which can lead into uncontrollable 
growth of the operator's
+state.
+
+In order to address the issue, you can enable watermark alignment, which will 
make sure no
+sources/splits/shards/partitions increase their watermarks too far ahead of 
the rest. You can enable
+alignment for every source separately:
+
+
+{{< tabs >}}
+{{< tab "Java" >}}
+```java
+WatermarkStrategy
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), 
Duration.ofSeconds(1));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+WatermarkStrategy
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), 
Duration.ofSeconds(1))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< hint warning >}}
+**Note:** You can enable watermark alignment only for [FLIP-27]({{< ref 
"docs/dev/datastream/sources" >}})
+sources. It does not work for legacy or if applied after the source via
+[DataStream#assignTimestampsAndWatermarks](#using-watermark-strategies).
+{{< /hint >}}
+
+When enabling the alignment, you need to tell Flink, which group should the 
source belong. You do
+that by providing a label (e.g. `alignment-group-1`) which bind together all 
sources that share it.
+Moreover, you have to tell the maximal drift from the current minimal 
watermarks across all sources
+belonging to that group. The third parameter describes how often the current 
maximal watermark
+should be updated. The downside of frequent updates is that there will be more 
RPC messages
+travelling between TMs and the JM.
+
+In order to achieve the alignment Flink will pause consuming from the 
source/task, which generated
+watermark that is too far into the future. In the meantime it will continue 
reading records from
+other sources/tasks which can move the combined watermark forward and that way 
unblock the faster
+one.
+
+{{< hint warning >}}
+**Note:** As of 1.15, Flink supports aligning across tasks of the same source 
and/or different
+sources. It does not support aligning splits/partitions/shards in the same 
task.
+
+In a case where there are e.g. two Kafka partitions that produce watermarks at 
different pace, that
+get assigned to the same task watermark might not behave as expected. 
Fortunately, worst case it
+should not perform worse than without alignment.
+
+Given the limitation above, we suggest applying watermark alignment in two 
situations:
+
+1. You have two different sources (e.g. Kafka and File) that produce 
watermarks at different speeds
+2. You run your source with parallelism equal to the number of 
splits/shards/partitions, which
+   results in every subtask being assigned a single unit of work.
+
+{{< /hint >}}
 
 <a name="writing-watermarkgenerators"></a>
 
diff --git 
a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md 
b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
index 9d1bb83..fc20833 100644
--- a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
+++ b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
@@ -201,6 +201,76 @@ WatermarkStrategy
 {{< /tab >}}
 {{< /tabs >}}
 
+## Watermark alignment _`Beta`_
+
+In the previous paragraph we discussed a situation when 
splits/partitions/shards or sources are idle
+and can stall increasing watermarks. On the other side of the spectrum, a 
split/partition/shard or
+source may process records very fast and in turn increase its watermark 
relatively faster than the
+others. This on its own is not a problem per se. However, for downstream 
operators that are using
+watermarks to emit some data it can actually become a problem. 
+
+In this case, contrary to idle sources, the watermark of such downstream 
operator (like windowed
+joins on aggregations) can progress. However, such operator might need to 
buffer excessive amount of
+data coming from the fast inputs, as the minimal watermark from all of its 
inputs is held back by
+the lagging input. All records emitted by the fast input will hence have to be 
buffered
+in the said downstream operator state, which can lead into uncontrollable 
growth of the operator's
+state.
+
+In order to address the issue, you can enable watermark alignment, which will 
make sure no
+sources/splits/shards/partitions increase their watermarks too far ahead of 
the rest. You can enable
+alignment for every source separately:
+
+
+{{< tabs >}}
+{{< tab "Java" >}}
+```java
+WatermarkStrategy
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), 
Duration.ofSeconds(1));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+WatermarkStrategy
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), 
Duration.ofSeconds(1))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< hint warning >}}
+**Note:** You can enable watermark alignment only for [FLIP-27]({{< ref 
"docs/dev/datastream/sources" >}}) 
+sources. It does not work for legacy or if applied after the source via 
+[DataStream#assignTimestampsAndWatermarks](#using-watermark-strategies).
+{{< /hint >}}
+
+When enabling the alignment, you need to tell Flink, which group should the 
source belong. You do
+that by providing a label (e.g. `alignment-group-1`) which bind together all 
sources that share it.
+Moreover, you have to tell the maximal drift from the current minimal 
watermarks across all sources
+belonging to that group. The third parameter describes how often the current 
maximal watermark
+should be updated. The downside of frequent updates is that there will be more 
RPC messages
+travelling between TMs and the JM.
+
+In order to achieve the alignment Flink will pause consuming from the 
source/task, which generated
+watermark that is too far into the future. In the meantime it will continue 
reading records from
+other sources/tasks which can move the combined watermark forward and that way 
unblock the faster
+one.
+
+{{< hint warning >}}
+**Note:** As of 1.15, Flink supports aligning across tasks of the same source 
and/or different
+sources. It does not support aligning splits/partitions/shards in the same 
task.
+
+In a case where there are e.g. two Kafka partitions that produce watermarks at 
different pace, that
+get assigned to the same task watermark might not behave as expected. 
Fortunately, worst case it
+should not perform worse than without alignment.
+
+Given the limitation above, we suggest applying watermark alignment in two 
situations:
+
+1. You have two different sources (e.g. Kafka and File) that produce 
watermarks at different speeds
+2. You run your source with parallelism equal to the number of 
splits/shards/partitions, which
+   results in every subtask being assigned a single unit of work.
+
+{{< /hint >}}
 
 ## Writing WatermarkGenerators
 

Reply via email to