This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0779c91e581 [FLINK-34258][docs][table] Fix incorrect retract example for TableAggregateFunction 0779c91e581 is described below commit 0779c91e581dc16c4aef61d6cc27774f11495907 Author: Jane Chan <qingyue....@gmail.com> AuthorDate: Fri Feb 2 10:10:40 2024 +0800 [FLINK-34258][docs][table] Fix incorrect retract example for TableAggregateFunction This closes #24215 --- docs/content.zh/docs/dev/table/functions/udfs.md | 13 ++++++++----- docs/content/docs/dev/table/functions/udfs.md | 14 +++++++------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/docs/content.zh/docs/dev/table/functions/udfs.md b/docs/content.zh/docs/dev/table/functions/udfs.md index bf10e3b6482..5527cfcb978 100644 --- a/docs/content.zh/docs/dev/table/functions/udfs.md +++ b/docs/content.zh/docs/dev/table/functions/udfs.md @@ -1892,7 +1892,10 @@ tab {{< /tabs >}} -下面的例子展示了如何使用 `emitUpdateWithRetract` 方法来只发送更新的数据。为了只发送更新的结果,accumulator 保存了上一次的最大的2个值,也保存了当前最大的2个值。注意:如果 TopN 中的 n 非常大,这种既保存上次的结果,也保存当前的结果的方式不太高效。一种解决这种问题的方式是把输入数据直接存储到 `accumulator` 中,然后在调用 `emitUpdateWithRetract` 方法时再进行计算。 +下面的例子展示了如何使用 `emitUpdateWithRetract` 方法来只发送更新的数据。为了只发送更新的结果,accumulator 保存了上一次的最大的2个值,也保存了当前最大的2个值。 +{{< hint info >}} +注意:请不要在 `emitUpdateWithRetract` 方法中更新 accumulator,因为在调用 `function#emitUpdateWithRetract` 之后,`GroupTableAggFunction` 不会重新调用 `function#getAccumulators` 来将最新的 accumulator 更新到状态中。 +{{< /hint >}} {{< tabs "e0d841fe-8d95-4706-9e19-e76141171966" >}} {{< tab "Java" >}} @@ -1923,6 +1926,8 @@ public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer> } public void accumulate(Top2Accum acc, Integer v) { + acc.oldFirst = acc.first; + acc.oldSecond = acc.second; if (v > acc.first) { acc.second = acc.first; acc.first = v; @@ -1938,7 +1943,6 @@ public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer> out.retract(Tuple2.of(acc.oldFirst, 1)); } out.collect(Tuple2.of(acc.first, 1)); - acc.oldFirst = acc.first; } if (!acc.second.equals(acc.oldSecond)) { @@ -1947,7 +1951,6 @@ public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer> out.retract(Tuple2.of(acc.oldSecond, 2)); } out.collect(Tuple2.of(acc.second, 2)); - acc.oldSecond = acc.second; } } } @@ -1997,6 +2000,8 @@ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum } def accumulate(acc: Top2Accum, v: Int) { + acc.oldFirst = acc.first + acc.oldSecond = acc.second if (v > acc.first) { acc.second = acc.first acc.first = v @@ -2015,7 +2020,6 @@ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum out.retract(JTuple2.of(acc.oldFirst, 1)) } out.collect(JTuple2.of(acc.first, 1)) - acc.oldFirst = acc.first } if (acc.second != acc.oldSecond) { // if there is an update, retract old value then emit new value. @@ -2023,7 +2027,6 @@ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum out.retract(JTuple2.of(acc.oldSecond, 2)) } out.collect(JTuple2.of(acc.second, 2)) - acc.oldSecond = acc.second } } } diff --git a/docs/content/docs/dev/table/functions/udfs.md b/docs/content/docs/dev/table/functions/udfs.md index 45f981f430c..b0c095f26b8 100644 --- a/docs/content/docs/dev/table/functions/udfs.md +++ b/docs/content/docs/dev/table/functions/udfs.md @@ -1778,9 +1778,9 @@ def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): Unit The following example shows how to use the `emitUpdateWithRetract(...)` method to emit only incremental updates. In order to do so, the accumulator keeps both the old and new top 2 values. -If the N of Top N is big, it might be inefficient to keep both the old and new values. One way to -solve this case is to store only the input record in the accumulator in `accumulate` method and then perform -a calculation in `emitUpdateWithRetract`. +{{< hint info >}} +Note: Do not update accumulator within `emitUpdateWithRetract` because after `function#emitUpdateWithRetract` is invoked, `GroupTableAggFunction` will not re-invoke `function#getAccumulators` to update the latest accumulator to state. +{{< /hint >}} {{< tabs "043e94c6-05b5-4800-9e5f-7d11235f3a11" >}} {{< tab "Java" >}} @@ -1809,6 +1809,8 @@ public static class Top2WithRetract } public void accumulate(Top2WithRetractAccumulator acc, Integer v) { + acc.oldFirst = acc.first; + acc.oldSecond = acc.second; if (v > acc.first) { acc.second = acc.first; acc.first = v; @@ -1826,7 +1828,6 @@ public static class Top2WithRetract out.retract(Tuple2.of(acc.oldFirst, 1)); } out.collect(Tuple2.of(acc.first, 1)); - acc.oldFirst = acc.first; } if (!acc.second.equals(acc.oldSecond)) { // if there is an update, retract the old value then emit a new value @@ -1834,7 +1835,6 @@ public static class Top2WithRetract out.retract(Tuple2.of(acc.oldSecond, 2)); } out.collect(Tuple2.of(acc.second, 2)); - acc.oldSecond = acc.second; } } } @@ -1866,6 +1866,8 @@ class Top2WithRetract } def accumulate(acc: Top2WithRetractAccumulator, value: Integer): Unit = { + acc.oldFirst = acc.first + acc.oldSecond = acc.second if (value > acc.first) { acc.second = acc.first acc.first = value @@ -1884,7 +1886,6 @@ class Top2WithRetract out.retract(Tuple2.of(acc.oldFirst, 1)) } out.collect(Tuple2.of(acc.first, 1)) - acc.oldFirst = acc.first } if (!acc.second.equals(acc.oldSecond)) { // if there is an update, retract the old value then emit a new value @@ -1892,7 +1893,6 @@ class Top2WithRetract out.retract(Tuple2.of(acc.oldSecond, 2)) } out.collect(Tuple2.of(acc.second, 2)) - acc.oldSecond = acc.second } } }