This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1c6943c3f713437edf93b547e1f42addc6ed98b4
Author: dylanhz <[email protected]>
AuthorDate: Tue Mar 24 21:10:21 2026 +0800

    [FLINK-39187][table] Add the built-in function BITMAP_BUILD_AGG
---
 .../docs/sql/functions/built-in-functions.md       |   4 +
 .../docs/sql/functions/built-in-functions.md       |   4 +
 docs/data/sql_functions.yml                        |  10 ++
 docs/data/sql_functions_zh.yml                     |  10 ++
 .../docs/reference/pyflink.table/expressions.rst   |   1 +
 flink-python/pyflink/table/expression.py           |   8 +
 .../pyflink/table/tests/test_expression.py         |   1 +
 .../flink/table/api/internal/BaseExpressions.java  |  10 ++
 .../functions/BuiltInFunctionDefinitions.java      |  12 ++
 .../planner/plan/utils/AggFunctionFactory.scala    |  12 ++
 .../planner/functions/BitmapAggFunctionITCase.java | 146 +++++++++++++++
 .../runtime/batch/sql/OverAggregateITCase.scala    |  30 ++++
 .../runtime/stream/sql/AggregateITCase.scala       |  71 ++++++++
 .../runtime/stream/sql/OverAggregateITCase.scala   |  39 ++++
 .../runtime/stream/sql/WindowAggregateITCase.scala |  26 +++
 .../runtime/stream/table/AggregateITCase.scala     |  29 +++
 .../aggregate/BitmapBuildAggFunction.java          |  97 ++++++++++
 .../BitmapBuildWithRetractAggFunction.java         | 199 +++++++++++++++++++++
 18 files changed, 709 insertions(+)

diff --git a/docs/content.zh/docs/sql/functions/built-in-functions.md 
b/docs/content.zh/docs/sql/functions/built-in-functions.md
index 70fa6bc5e84..f931534ce6e 100644
--- a/docs/content.zh/docs/sql/functions/built-in-functions.md
+++ b/docs/content.zh/docs/sql/functions/built-in-functions.md
@@ -122,6 +122,10 @@ JSON 函数使用符合 ISO/IEC TR 19075-6 SQL标准的 JSON 路径表达式。
 
 {{< sql_functions_zh "aggregate" >}}
 
+### 位图聚合函数
+
+{{< sql_functions_zh "bitmapagg" >}}
+
 时间间隔单位和时间点单位标识符
 ---------------------------------------
 
diff --git a/docs/content/docs/sql/functions/built-in-functions.md 
b/docs/content/docs/sql/functions/built-in-functions.md
index f85ae93e486..3ba23b32f33 100644
--- a/docs/content/docs/sql/functions/built-in-functions.md
+++ b/docs/content/docs/sql/functions/built-in-functions.md
@@ -125,6 +125,10 @@ The aggregate functions take an expression across all the 
rows as the input and
 
 {{< sql_functions "aggregate" >}}
 
+### Bitmap Aggregate Functions
+
+{{< sql_functions "bitmapagg" >}}
+
 Time Interval and Point Unit Specifiers
 ---------------------------------------
 
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 8f92038de69..fd7fc55c949 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -1568,6 +1568,16 @@ aggregate:
       FROM orders
       ```
 
+bitmapagg:
+  - sql: BITMAP_BUILD_AGG(value)
+    table: value.bitmapBuildAgg()
+    description: |
+      Aggregates 32-bit integers into a bitmap.
+      
+      `value INT`
+      
+      Returns a `BITMAP`.
+
 catalog:
   - sql: CURRENT_DATABASE()
     table: currentDatabase()
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index c28996064d7..c74b830968a 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -1652,6 +1652,16 @@ aggregate:
       FROM orders
       ```
 
+bitmapagg:
+  - sql: BITMAP_BUILD_AGG(value)
+    table: value.bitmapBuildAgg()
+    description: |
+      将 32 位整数聚合成位图。
+      
+      `value INT`
+      
+      返回一个 `BITMAP`。
+
 catalog:
   - sql: CURRENT_DATABASE()
     table: currentDatabase()
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 9ad848c167f..727f5e979b0 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -348,6 +348,7 @@ Bitmap functions
     Expression.bitmap_and
     Expression.bitmap_andnot
     Expression.bitmap_build
+    Expression.bitmap_build_agg
     Expression.bitmap_cardinality
     Expression.bitmap_from_bytes
     Expression.bitmap_or
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index a251a2dfcda..9241bbadf55 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -2309,6 +2309,14 @@ class Expression(Generic[T]):
         """
         return _unary_op("bitmapBuild")(self)
 
+    def bitmap_build_agg(self):
+        """
+        Aggregates 32-bit integers into a bitmap.
+
+        :return: a BITMAP expression
+        """
+        return _unary_op("bitmapBuildAgg")(self)
+
     def bitmap_cardinality(self) -> 'Expression':
         """
         Returns the cardinality of a bitmap.
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index edc0dab1770..f6d10fc99c5 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -269,6 +269,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual("BITMAP_AND(a, b)", str(expr1.bitmap_and(expr2)))
         self.assertEqual("BITMAP_ANDNOT(a, b)", 
str(expr1.bitmap_andnot(expr2)))
         self.assertEqual("BITMAP_BUILD(a)", str(expr1.bitmap_build()))
+        self.assertEqual("BITMAP_BUILD_AGG(a)", str(expr1.bitmap_build_agg()))
         self.assertEqual("BITMAP_CARDINALITY(a)", 
str(expr1.bitmap_cardinality()))
         self.assertEqual("BITMAP_FROM_BYTES(a)", 
str(expr1.bitmap_from_bytes()))
         self.assertEqual("BITMAP_OR(a, b)", str(expr1.bitmap_or(expr2)))
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 3704f80ea8b..086ef39867e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -79,6 +79,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_ANDNOT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD_AGG;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_CARDINALITY;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_FROM_BYTES;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR;
@@ -2651,6 +2652,15 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD, toExpr()));
     }
 
+    /**
+     * Aggregates 32-bit integers into a bitmap.
+     *
+     * @return a BITMAP expression
+     */
+    public OutType bitmapBuildAgg() {
+        return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD_AGG, 
toExpr()));
+    }
+
     /**
      * Returns the cardinality of a bitmap.
      *
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 7dac1d91cc8..44c40421e30 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -3037,6 +3037,18 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.BitmapBuildFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition BITMAP_BUILD_AGG =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("BITMAP_BUILD_AGG")
+                    .kind(AGGREGATE)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Collections.singletonList("value"),
+                                    
Collections.singletonList(logical(LogicalTypeRoot.INTEGER))))
+                    .outputTypeStrategy(explicit(DataTypes.BITMAP()))
+                    .runtimeProvided()
+                    .build();
+
     public static final BuiltInFunctionDefinition BITMAP_CARDINALITY =
             BuiltInFunctionDefinition.newBuilder()
                     .name("BITMAP_CARDINALITY")
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index ce6bbece424..3eecd384abf 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -174,6 +174,8 @@ class AggFunctionFactory(
           // built-in imperativeFunction
           case BuiltInFunctionDefinitions.PERCENTILE =>
             createPercentileAggFunction(argTypes)
+          case BuiltInFunctionDefinitions.BITMAP_BUILD_AGG =>
+            createBitmapBuildAggFunction(argTypes, index)
           // DeclarativeAggregateFunction & UDF
           case _ =>
             bridge.getDefinition.asInstanceOf[UserDefinedFunction]
@@ -649,4 +651,14 @@ class AggFunctionFactory(
       new SinglePercentileAggFunction(firstArg, secondArg)
     }
   }
+
+  private def createBitmapBuildAggFunction(
+      argTypes: Array[LogicalType],
+      index: Int): UserDefinedFunction = {
+    if (aggCallNeedRetractions(index)) {
+      new BitmapBuildWithRetractAggFunction(argTypes(0))
+    } else {
+      new BitmapBuildAggFunction(argTypes(0))
+    }
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java
new file mode 100644
index 00000000000..8312da400e3
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.bitmap.Bitmap;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BITMAP;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.types.RowKind.UPDATE_BEFORE;
+
+/** Tests for built-in bitmap aggregation functions. */
+class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase {
+
+    @Override
+    Stream<TestSpec> getTestCaseSpecs() {
+        final List<TestSpec> specs = new ArrayList<>();
+        specs.addAll(bitmapBuildAggTestCases());
+        return specs.stream();
+    }
+
+    private List<TestSpec> bitmapBuildAggTestCases() {
+        return Arrays.asList(
+                
TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG)
+                        .withDescription("without retraction")
+                        .withSource(
+                                ROW(INT(), STRING()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, 1, "A"),
+                                        Row.ofKind(INSERT, 2, "A"),
+                                        Row.ofKind(INSERT, null, "A"),
+                                        Row.ofKind(INSERT, 4, "A"),
+                                        Row.ofKind(INSERT, 3, "A"),
+                                        Row.ofKind(INSERT, 2, "B"),
+                                        Row.ofKind(INSERT, -1, "A"),
+                                        Row.ofKind(INSERT, 1, "B"),
+                                        Row.ofKind(INSERT, -1, "B"),
+                                        Row.ofKind(INSERT, null, "B"),
+                                        Row.ofKind(INSERT, null, "C")))
+                        .testResult(
+                                source ->
+                                        "SELECT f1, BITMAP_BUILD_AGG(f0) FROM "
+                                                + source
+                                                + " GROUP BY f1",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f1")),
+                                        $("f1"),
+                                        $("f0").bitmapBuildAgg()),
+                                ROW(STRING(), BITMAP()),
+                                ROW(STRING(), BITMAP()),
+                                Arrays.asList(
+                                        Row.of("A", Bitmap.fromArray(new int[] 
{-1, 1, 2, 3, 4})),
+                                        Row.of("B", Bitmap.fromArray(new int[] 
{-1, 1, 2})),
+                                        Row.of("C", null))),
+                
TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG)
+                        .withDescription("with retraction")
+                        .withSource(
+                                ROW(INT(), STRING()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, 1, "A"),
+                                        Row.ofKind(INSERT, null, "A"),
+                                        Row.ofKind(DELETE, 1, "A"),
+                                        Row.ofKind(INSERT, 1, "B"),
+                                        Row.ofKind(DELETE, 1, "B"),
+                                        Row.ofKind(INSERT, null, "B"),
+                                        Row.ofKind(INSERT, 3, "B"),
+                                        Row.ofKind(DELETE, 2, "B"), // count < 0
+                                        Row.ofKind(INSERT, -1, "B"),
+                                        Row.ofKind(INSERT, 2, "B"),
+                                        Row.ofKind(INSERT, 2, "B"),
+                                        Row.ofKind(INSERT, 1, "B"),
+                                        Row.ofKind(DELETE, 1, "B"),
+                                        Row.ofKind(UPDATE_BEFORE, 3, "B"),
+                                        Row.ofKind(UPDATE_AFTER, 1, "B"),
+                                        Row.ofKind(INSERT, 2, "C"),
+                                        Row.ofKind(INSERT, 1, "C"),
+                                        Row.ofKind(INSERT, -1, "C"),
+                                        Row.ofKind(INSERT, null, "C"),
+                                        Row.ofKind(DELETE, 1, "C"),
+                                        Row.ofKind(DELETE, -1, "C"),
+                                        Row.ofKind(DELETE, null, "C"),
+                                        Row.ofKind(UPDATE_BEFORE, 2, "C"),
+                                        Row.ofKind(UPDATE_AFTER, 1, "C")))
+                        .testResult(
+                                source ->
+                                        "SELECT f1, BITMAP_BUILD_AGG(f0) FROM "
+                                                + source
+                                                + " GROUP BY f1",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f1")),
+                                        $("f1"),
+                                        $("f0").bitmapBuildAgg()),
+                                ROW(STRING(), BITMAP()),
+                                ROW(STRING(), BITMAP()),
+                                Arrays.asList(
+                                        Row.of("A", null),
+                                        Row.of("B", Bitmap.fromArray(new int[] 
{-1, 1, 2})),
+                                        Row.of("C", Bitmap.fromArray(new int[] 
{1})))),
+                
TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG)
+                        .withDescription("Validation Error")
+                        .withSource(
+                                ROW(BIGINT(), STRING()),
+                                Collections.singletonList(Row.ofKind(INSERT, 
1L, "A")))
+                        .testValidationError(
+                                source ->
+                                        "SELECT f1, BITMAP_BUILD_AGG(f0) FROM "
+                                                + source
+                                                + " GROUP BY f1",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f1")),
+                                        $("f1"),
+                                        $("f0").bitmapBuildAgg()),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "BITMAP_BUILD_AGG(value 
<INTEGER>)"));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
index b08a944b571..c9ca677c07a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
@@ -2974,6 +2974,36 @@ class OverAggregateITCase extends BatchTestBase {
       )
     )
   }
+
+  @Test
+  def testBitmapBuildAgg(): Unit = {
+    checkResult(
+      "SELECT " +
+        "d, e, " +
+        "BITMAP_BUILD_AGG(d) OVER (ORDER BY e ROWS BETWEEN 4 PRECEDING AND 
CURRENT ROW), " +
+        "BITMAP_BUILD_AGG(CAST(e AS INT)) OVER (ORDER BY e ROWS BETWEEN 4 
PRECEDING AND CURRENT ROW) " +
+        "FROM NullTable5",
+      Seq(
+        row(1, 1L, "{1}", "{1}"),
+        row(2, 2L, "{1,2}", "{1,2}"),
+        row(2, 3L, "{1,2}", "{1,2,3}"),
+        row(3, 4L, "{1,2,3}", "{1,2,3,4}"),
+        row(3, 5L, "{1,2,3}", "{1,2,3,4,5}"),
+        row(3, 6L, "{2,3}", "{2,3,4,5,6}"),
+        row(4, 7L, "{2,3,4}", "{3,4,5,6,7}"),
+        row(4, 8L, "{3,4}", "{4,5,6,7,8}"),
+        row(4, 9L, "{3,4}", "{5,6,7,8,9}"),
+        row(4, 10L, "{3,4}", "{6,7,8,9,10}"),
+        row(5, 11L, "{4,5}", "{7,8,9,10,11}"),
+        row(5, 12L, "{4,5}", "{8,9,10,11,12}"),
+        row(5, 13L, "{4,5}", "{9,10,11,12,13}"),
+        row(5, 14L, "{4,5}", "{10,11,12,13,14}"),
+        row(5, 15L, "{5}", "{11,12,13,14,15}"),
+        row(null, 999L, "{5}", "{12,13,14,15,999}"),
+        row(null, 999L, "{5}", "{13,14,15,999}")
+      )
+    )
+  }
 }
 
 /** The initial accumulator for count aggregate function */
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 3d0e189294e..cd2adf0154a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -2119,6 +2119,77 @@ class AggregateITCase(
 
     tEnv.dropTemporarySystemFunction("PERCENTILE")
   }
+
+  @TestTemplate
+  def testBitmapBuildAgg(): Unit = {
+    val data = new mutable.MutableList[(Int, Int, String)]
+    for (i <- 0 until 5) {
+      data.+=((i, -i, "a"))
+      data.+=((i * 2, -i * 2, "b"))
+    }
+
+    val sql =
+      """
+        |SELECT
+        |  c,
+        |  CAST(BITMAP_BUILD_AGG(a) AS STRING),
+        |  CAST(BITMAP_BUILD_AGG(b) AS STRING)
+        |FROM MyTable
+        |GROUP BY c
+      """.stripMargin
+
+    val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c)
+    tEnv.createTemporaryView("MyTable", t)
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = List(
+      
s"a,{0,1,2,3,4},{0,${Integer.toUnsignedLong(-4)},${Integer.toUnsignedLong(-3)},${Integer
+          .toUnsignedLong(-2)},${Integer.toUnsignedLong(-1)}}",
+      
s"b,{0,2,4,6,8},{0,${Integer.toUnsignedLong(-8)},${Integer.toUnsignedLong(-6)},${Integer
+          .toUnsignedLong(-4)},${Integer.toUnsignedLong(-2)}}"
+    )
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testBitmapBuildAggWithRetract(): Unit = {
+    val data = new mutable.MutableList[(Int, Int, String)]
+    for (i <- 0 until 5) {
+      data.+=((i, i, "a"))
+      data.+=((i + 1, i * 2, "b"))
+      data.+=((i + 2, i * 3, "c"))
+    }
+
+    val inner =
+      """
+        |SELECT
+        |  MAX(a) AS ma,
+        |  LAST_VALUE(b) AS la,
+        |  c
+        |FROM MyTable
+        |GROUP BY c
+      """.stripMargin
+    val sql =
+      s"""
+         |SELECT
+         |  CAST(BITMAP_BUILD_AGG(ma) AS STRING),
+         |  CAST(BITMAP_BUILD_AGG(la) AS STRING)
+         |FROM ($inner)
+      """.stripMargin
+
+    val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c)
+    tEnv.createTemporaryView("MyTable", t)
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = List(s"{4,5,6},{4,8,12}")
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+  }
 }
 
 object AggregateITCase {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
index 82c58da9a87..c4deb3500a9 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
@@ -1499,6 +1499,45 @@ class OverAggregateITCase(mode: StateBackendMode, 
unboundedOverVersion: Int)
       }
     }
   }
+
+  @TestTemplate
+  def testBitmapBuildAgg(): Unit = {
+    val sql =
+      """
+        |SELECT
+        |  a, b,
+        |  BITMAP_BUILD_AGG(a) OVER (ORDER BY proctime ROWS BETWEEN 4 
PRECEDING AND CURRENT ROW),
+        |  BITMAP_BUILD_AGG(CAST(b AS INT)) OVER (ORDER BY proctime ROWS 
BETWEEN 4 PRECEDING AND CURRENT ROW)
+        |FROM MyTable
+      """.stripMargin
+
+    val t =
+      failingDataSource(TestData.tupleData5).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 
'proctime.proctime)
+    tEnv.createTemporaryView("MyTable", t)
+
+    val sink = new TestingAppendSink()
+    tEnv.sqlQuery(sql).toDataStream.addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = List(
+      "1,1,{1},{1}",
+      "2,2,{1,2},{1,2}",
+      "2,3,{1,2},{1,2,3}",
+      "3,4,{1,2,3},{1,2,3,4}",
+      "3,5,{1,2,3},{1,2,3,4,5}",
+      "3,6,{2,3},{2,3,4,5,6}",
+      "4,7,{2,3,4},{3,4,5,6,7}",
+      "4,8,{3,4},{4,5,6,7,8}",
+      "4,9,{3,4},{5,6,7,8,9}",
+      "4,10,{3,4},{6,7,8,9,10}",
+      "5,11,{4,5},{7,8,9,10,11}",
+      "5,12,{4,5},{8,9,10,11,12}",
+      "5,13,{4,5},{9,10,11,12,13}",
+      "5,14,{4,5},{10,11,12,13,14}",
+      "5,15,{5},{11,12,13,14,15}"
+    )
+    assertThat(sink.getAppendResults).isEqualTo(expected)
+  }
 }
 
 object OverAggregateITCase {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index 12ac1f73680..0c498882137 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -988,6 +988,32 @@ class WindowAggregateITCase(
     }
   }
 
+  @TestTemplate
+  def testBitmapBuildAggOnEventTimeTumbleWindow(): Unit = {
+    val sql =
+      """
+        |SELECT
+        |  window_start,
+        |  window_end,
+        |  BITMAP_BUILD_AGG(`int`) as `bitmap`
+        |FROM TABLE(
+        |   TUMBLE(TABLE T1_CDC, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))
+        |GROUP BY window_start, window_end
+      """.stripMargin
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toDataStream.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "2020-10-10T00:00,2020-10-10T00:00:05,{2,5,22}",
+      "2020-10-10T00:00:05,2020-10-10T00:00:10,{3,6}",
+      "2020-10-10T00:00:15,2020-10-10T00:00:20,{4}"
+    )
+    val result = sink.getAppendResults.sorted
+    assertThat(result.mkString("\n")).isEqualTo(expected.mkString("\n"))
+  }
+
   private def verifyWindowAgg(
       tvfFromClause: String,
       allExpectedData: Seq[String],
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
index 067bfd60905..29a6e80a2e7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
@@ -606,4 +606,33 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
       }
     }
   }
+
+  @TestTemplate
+  def testBitmapBuildAgg(): Unit = {
+    val data = new mutable.MutableList[(Int, Int, String)]
+    for (i <- 0 until 5) {
+      data.+=((i, -i, "a"))
+      data.+=((i * 2, -i * 2, "b"))
+    }
+
+    val t = failingDataSource(data)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('c)
+      .select(
+        'c,
+        'a.bitmapBuildAgg().cast(DataTypes.STRING()),
+        'b.bitmapBuildAgg().cast(DataTypes.STRING()))
+
+    val sink = new TestingRetractSink
+    t.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = List(
+      
s"a,{0,1,2,3,4},{0,${Integer.toUnsignedLong(-4)},${Integer.toUnsignedLong(-3)},${Integer
+          .toUnsignedLong(-2)},${Integer.toUnsignedLong(-1)}}",
+      
s"b,{0,2,4,6,8},{0,${Integer.toUnsignedLong(-8)},${Integer.toUnsignedLong(-6)},${Integer
+          .toUnsignedLong(-4)},${Integer.toUnsignedLong(-2)}}"
+    )
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+  }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildAggFunction.java
new file mode 100644
index 00000000000..4e0215019e8
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildAggFunction.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.bitmap.Bitmap;
+import org.apache.flink.types.bitmap.RoaringBitmapData;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in BITMAP_BUILD_AGG aggregate function. */
+@Internal
+public final class BitmapBuildAggFunction extends 
BuiltInAggregateFunction<Bitmap, Bitmap> {
+
+    private final transient DataType valueDataType;
+
+    public BitmapBuildAggFunction(LogicalType valueType) {
+        this.valueDataType = toInternalDataType(valueType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Planning
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return Collections.singletonList(valueDataType);
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.BITMAP().notNull();
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.BITMAP();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Accumulator
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public Bitmap createAccumulator() {
+        return Bitmap.empty();
+    }
+
+    public void resetAccumulator(Bitmap acc) {
+        acc.clear();
+    }
+
+    @Override
+    public Bitmap getValue(Bitmap acc) {
+        return acc.isEmpty() ? null : RoaringBitmapData.from(acc);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Runtime
+    // 
--------------------------------------------------------------------------------------------
+
+    public void accumulate(Bitmap acc, @Nullable Integer value) {
+        if (value != null) {
+            acc.add(value);
+        }
+    }
+
+    public void merge(Bitmap acc, Iterable<Bitmap> its) {
+        for (Bitmap other : its) {
+            acc.or(other);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildWithRetractAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildWithRetractAggFunction.java
new file mode 100644
index 00000000000..c9599b7f31e
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildWithRetractAggFunction.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.dataview.MapView;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.bitmap.Bitmap;
+import org.apache.flink.types.bitmap.RoaringBitmapData;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in BITMAP_BUILD_AGG with retraction aggregate function. */
+@Internal
+public final class BitmapBuildWithRetractAggFunction
+        extends BuiltInAggregateFunction<
+                Bitmap, 
BitmapBuildWithRetractAggFunction.BitmapBuildWithRetractAccumulator> {
+
+    private final transient DataType valueDataType;
+
+    public BitmapBuildWithRetractAggFunction(LogicalType valueType) {
+        this.valueDataType = toInternalDataType(valueType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Planning
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return Collections.singletonList(valueDataType);
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                BitmapBuildWithRetractAccumulator.class,
+                DataTypes.FIELD("bitmap", DataTypes.BITMAP().notNull()),
+                DataTypes.FIELD(
+                        "valueCount",
+                        MapView.newMapViewDataType(
+                                DataTypes.INT().notNull(), 
DataTypes.INT().notNull())));
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.BITMAP();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Accumulator
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Accumulator for BITMAP_BUILD_AGG with retraction. */
+    public static class BitmapBuildWithRetractAccumulator {
+
+        // bitmap should reflect the actual data based on valueCount
+        public RoaringBitmapData bitmap = RoaringBitmapData.empty();
+        public MapView<Integer, Integer> valueCount = new MapView<>();
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+            BitmapBuildWithRetractAccumulator that = 
(BitmapBuildWithRetractAccumulator) obj;
+            return Objects.equals(bitmap, that.bitmap)
+                    && Objects.equals(valueCount, that.valueCount);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(bitmap, valueCount);
+        }
+    }
+
+    @Override
+    public BitmapBuildWithRetractAccumulator createAccumulator() {
+        return new BitmapBuildWithRetractAccumulator();
+    }
+
+    public void resetAccumulator(BitmapBuildWithRetractAccumulator acc) {
+        acc.bitmap.clear();
+        acc.valueCount.clear();
+    }
+
+    @Override
+    public Bitmap getValue(BitmapBuildWithRetractAccumulator acc) {
+        return acc.bitmap.isEmpty() ? null : 
RoaringBitmapData.from(acc.bitmap);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Runtime
+    // 
--------------------------------------------------------------------------------------------
+
+    public void accumulate(BitmapBuildWithRetractAccumulator acc, @Nullable 
Integer value)
+            throws Exception {
+        if (value != null) {
+            Integer count = acc.valueCount.get(value);
+            count = count == null ? 1 : count + 1;
+
+            if (count == 0) {
+                acc.valueCount.remove(value);
+            } else {
+                acc.valueCount.put(value, count);
+                // add value to bitmap if count changes from 0 to 1 or expires
+                if (count == 1) {
+                    acc.bitmap.add(value);
+                }
+            }
+        }
+    }
+
+    public void retract(BitmapBuildWithRetractAccumulator acc, @Nullable 
Integer value)
+            throws Exception {
+        if (value != null) {
+            Integer count = acc.valueCount.get(value);
+            count = count == null ? -1 : count - 1;
+
+            if (count == 0) {
+                acc.valueCount.remove(value);
+                // remove value from bitmap if count changes from 1 to 0
+                acc.bitmap.remove(value);
+            } else {
+                acc.valueCount.put(value, count);
+                if (count == -1) {
+                    // remove value from bitmap if count expires
+                    acc.bitmap.remove(value);
+                }
+            }
+        }
+    }
+
+    public void merge(
+            BitmapBuildWithRetractAccumulator acc, 
Iterable<BitmapBuildWithRetractAccumulator> its)
+            throws Exception {
+        for (BitmapBuildWithRetractAccumulator other : its) {
+            for (Map.Entry<Integer, Integer> entry : 
other.valueCount.entries()) {
+                Integer value = entry.getKey();
+                // count != 0
+                Integer count = entry.getValue();
+
+                Integer curCount = acc.valueCount.get(value);
+                curCount = curCount == null ? count : curCount + count;
+
+                if (curCount == 0) {
+                    acc.valueCount.remove(value);
+
+                    if (curCount > count) {
+                        // preCount > 0, value is in bitmap
+                        acc.bitmap.remove(value);
+                    }
+                } else {
+                    acc.valueCount.put(value, curCount);
+
+                    if (0 < curCount && curCount <= count) {
+                        // preCount < 0, value is not in bitmap
+                        // preCount = 0, unknown (expiration)
+                        acc.bitmap.add(value);
+                    }
+
+                    if (0 > curCount && curCount >= count) {
+                        // preCount > 0, value is in bitmap
+                        // preCount = 0, unknown (expiration)
+                        acc.bitmap.remove(value);
+                    }
+                }
+            }
+        }
+    }
+}


Reply via email to