This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0779c91e581 [FLINK-34258][docs][table] Fix incorrect retract example 
for TableAggregateFunction
0779c91e581 is described below

commit 0779c91e581dc16c4aef61d6cc27774f11495907
Author: Jane Chan <qingyue....@gmail.com>
AuthorDate: Fri Feb 2 10:10:40 2024 +0800

    [FLINK-34258][docs][table] Fix incorrect retract example for 
TableAggregateFunction
    
    This closes #24215
---
 docs/content.zh/docs/dev/table/functions/udfs.md | 13 ++++++++-----
 docs/content/docs/dev/table/functions/udfs.md    | 14 +++++++-------
 2 files changed, 15 insertions(+), 12 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/functions/udfs.md 
b/docs/content.zh/docs/dev/table/functions/udfs.md
index bf10e3b6482..5527cfcb978 100644
--- a/docs/content.zh/docs/dev/table/functions/udfs.md
+++ b/docs/content.zh/docs/dev/table/functions/udfs.md
@@ -1892,7 +1892,10 @@ tab
 {{< /tabs >}}
 
 
-下面的例子展示了如何使用 `emitUpdateWithRetract` 方法来只发送更新的数据。为了只发送更新的结果,accumulator 
保存了上一次的最大的2个值,也保存了当前最大的2个值。注意:如果 TopN 中的 n 
非常大,这种既保存上次的结果,也保存当前的结果的方式不太高效。一种解决这种问题的方式是把输入数据直接存储到 `accumulator` 中,然后在调用 
`emitUpdateWithRetract` 方法时再进行计算。
+下面的例子展示了如何使用 `emitUpdateWithRetract` 方法来只发送更新的数据。为了只发送更新的结果,accumulator 
保存了上一次的最大的2个值,也保存了当前最大的2个值。
+{{< hint info >}}
+注意:请不要在 `emitUpdateWithRetract` 方法中更新 accumulator,因为在调用 
`function#emitUpdateWithRetract` 之后,`GroupTableAggFunction` 不会重新调用 
`function#getAccumulators` 来将最新的 accumulator 更新到状态中。
+{{< /hint >}}
 
 {{< tabs "e0d841fe-8d95-4706-9e19-e76141171966" >}}
 {{< tab "Java" >}}
@@ -1923,6 +1926,8 @@ public static class Top2 extends 
TableAggregateFunction<Tuple2<Integer, Integer>
     }
 
     public void accumulate(Top2Accum acc, Integer v) {
+        acc.oldFirst = acc.first;
+        acc.oldSecond = acc.second;
         if (v > acc.first) {
             acc.second = acc.first;
             acc.first = v;
@@ -1938,7 +1943,6 @@ public static class Top2 extends 
TableAggregateFunction<Tuple2<Integer, Integer>
                 out.retract(Tuple2.of(acc.oldFirst, 1));
             }
             out.collect(Tuple2.of(acc.first, 1));
-            acc.oldFirst = acc.first;
         }
 
         if (!acc.second.equals(acc.oldSecond)) {
@@ -1947,7 +1951,6 @@ public static class Top2 extends 
TableAggregateFunction<Tuple2<Integer, Integer>
                 out.retract(Tuple2.of(acc.oldSecond, 2));
             }
             out.collect(Tuple2.of(acc.second, 2));
-            acc.oldSecond = acc.second;
         }
     }
 }
@@ -1997,6 +2000,8 @@ class Top2 extends 
TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum
   }
 
   def accumulate(acc: Top2Accum, v: Int) {
+    acc.oldFirst = acc.first
+    acc.oldSecond = acc.second
     if (v > acc.first) {
       acc.second = acc.first
       acc.first = v
@@ -2015,7 +2020,6 @@ class Top2 extends 
TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum
         out.retract(JTuple2.of(acc.oldFirst, 1))
       }
       out.collect(JTuple2.of(acc.first, 1))
-      acc.oldFirst = acc.first
     }
     if (acc.second != acc.oldSecond) {
       // if there is an update, retract old value then emit new value.
@@ -2023,7 +2027,6 @@ class Top2 extends 
TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum
         out.retract(JTuple2.of(acc.oldSecond, 2))
       }
       out.collect(JTuple2.of(acc.second, 2))
-      acc.oldSecond = acc.second
     }
   }
 }
diff --git a/docs/content/docs/dev/table/functions/udfs.md 
b/docs/content/docs/dev/table/functions/udfs.md
index 45f981f430c..b0c095f26b8 100644
--- a/docs/content/docs/dev/table/functions/udfs.md
+++ b/docs/content/docs/dev/table/functions/udfs.md
@@ -1778,9 +1778,9 @@ def emitUpdateWithRetract(accumulator: ACC, out: 
RetractableCollector[T]): Unit
 The following example shows how to use the `emitUpdateWithRetract(...)` method 
to emit only incremental
 updates. In order to do so, the accumulator keeps both the old and new top 2 
values.
 
-If the N of Top N is big, it might be inefficient to keep both the old and new 
values. One way to
-solve this case is to store only the input record in the accumulator in 
`accumulate` method and then perform
-a calculation in `emitUpdateWithRetract`.
+{{< hint info >}}
+Note: Do not update accumulator within `emitUpdateWithRetract` because after 
`function#emitUpdateWithRetract` is invoked, `GroupTableAggFunction` will not 
re-invoke `function#getAccumulators` to update the latest accumulator to state.
+{{< /hint >}}
 
 {{< tabs "043e94c6-05b5-4800-9e5f-7d11235f3a11" >}}
 {{< tab "Java" >}}
@@ -1809,6 +1809,8 @@ public static class Top2WithRetract
   }
 
   public void accumulate(Top2WithRetractAccumulator acc, Integer v) {
+    acc.oldFirst = acc.first;
+    acc.oldSecond = acc.second;
     if (v > acc.first) {
       acc.second = acc.first;
       acc.first = v;
@@ -1826,7 +1828,6 @@ public static class Top2WithRetract
           out.retract(Tuple2.of(acc.oldFirst, 1));
       }
       out.collect(Tuple2.of(acc.first, 1));
-      acc.oldFirst = acc.first;
     }
     if (!acc.second.equals(acc.oldSecond)) {
       // if there is an update, retract the old value then emit a new value
@@ -1834,7 +1835,6 @@ public static class Top2WithRetract
           out.retract(Tuple2.of(acc.oldSecond, 2));
       }
       out.collect(Tuple2.of(acc.second, 2));
-      acc.oldSecond = acc.second;
     }
   }
 }
@@ -1866,6 +1866,8 @@ class Top2WithRetract
   }
 
   def accumulate(acc: Top2WithRetractAccumulator, value: Integer): Unit = {
+    acc.oldFirst = acc.first
+    acc.oldSecond = acc.second
     if (value > acc.first) {
       acc.second = acc.first
       acc.first = value
@@ -1884,7 +1886,6 @@ class Top2WithRetract
           out.retract(Tuple2.of(acc.oldFirst, 1))
       }
       out.collect(Tuple2.of(acc.first, 1))
-      acc.oldFirst = acc.first
     }
     if (!acc.second.equals(acc.oldSecond)) {
       // if there is an update, retract the old value then emit a new value
@@ -1892,7 +1893,6 @@ class Top2WithRetract
           out.retract(Tuple2.of(acc.oldSecond, 2))
       }
       out.collect(Tuple2.of(acc.second, 2))
-      acc.oldSecond = acc.second
     }
   }
 }

Reply via email to