This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 26046c653a6d70a1386fc69671fae448ba072fb6
Author: dylanhz <[email protected]>
AuthorDate: Tue Mar 24 02:02:12 2026 +0800

    [FLINK-39187][table] Add the built-in function BITMAP_AND_AGG, 
BITMAP_OR_AGG, BITMAP_XOR_AGG
---
 docs/data/sql_functions.yml                        |  28 ++
 docs/data/sql_functions_zh.yml                     |  28 ++
 .../docs/reference/pyflink.table/expressions.rst   |   3 +
 flink-python/pyflink/table/expression.py           |  30 ++
 .../pyflink/table/tests/test_expression.py         |   3 +
 .../flink/table/api/internal/BaseExpressions.java  |  36 +++
 .../functions/BuiltInFunctionDefinitions.java      |  36 +++
 .../planner/plan/utils/AggFunctionFactory.scala    |  36 +++
 .../planner/functions/BitmapAggFunctionITCase.java | 310 ++++++++++++++++++++-
 .../runtime/batch/sql/OverAggregateITCase.scala    |  30 ++
 .../runtime/stream/sql/AggregateITCase.scala       |  82 ++++++
 .../runtime/stream/sql/OverAggregateITCase.scala   |  40 +++
 .../runtime/stream/sql/WindowAggregateITCase.scala |  28 ++
 .../runtime/stream/table/AggregateITCase.scala     |  41 +++
 .../functions/aggregate/BitmapAndAggFunction.java  | 136 +++++++++
 .../aggregate/BitmapAndWithRetractAggFunction.java | 211 ++++++++++++++
 .../functions/aggregate/BitmapOrAggFunction.java   | 136 +++++++++
 .../aggregate/BitmapOrWithRetractAggFunction.java  | 229 +++++++++++++++
 .../functions/aggregate/BitmapXorAggFunction.java  | 136 +++++++++
 .../aggregate/BitmapXorWithRetractAggFunction.java | 142 ++++++++++
 20 files changed, 1717 insertions(+), 4 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index fd7fc55c949..f1d6c30a418 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -1569,6 +1569,16 @@ aggregate:
       ```
 
 bitmapagg:
+  - sql: BITMAP_AND_AGG(bitmap)
+    table: bitmap.bitmapAndAgg()
+    description: |
+      Aggregates the AND (intersection) of multiple bitmaps.
+      
+      NOTE: The retraction variant of this function may have significant 
performance overhead with large bitmaps.
+      
+      `bitmap BITMAP`
+      
+      Returns a `BITMAP`.
   - sql: BITMAP_BUILD_AGG(value)
     table: value.bitmapBuildAgg()
     description: |
@@ -1576,6 +1586,24 @@ bitmapagg:
       
       `value INT`
       
+      Returns a `BITMAP`.
+  - sql: BITMAP_OR_AGG(bitmap)
+    table: bitmap.bitmapOrAgg()
+    description: |
+      Aggregates the OR (union) of multiple bitmaps.
+      
+      NOTE: The retraction variant of this function may have significant 
performance overhead with large bitmaps.
+      
+      `bitmap BITMAP`
+      
+      Returns a `BITMAP`.
+  - sql: BITMAP_XOR_AGG(bitmap)
+    table: bitmap.bitmapXorAgg()
+    description: |
+      Aggregates the XOR (symmetric difference) of multiple bitmaps.
+      
+      `bitmap BITMAP`
+      
       Returns a `BITMAP`.
 
 catalog:
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index c74b830968a..8278e89677f 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -1653,6 +1653,16 @@ aggregate:
       ```
 
 bitmapagg:
+  - sql: BITMAP_AND_AGG(bitmap)
+    table: bitmap.bitmapAndAgg()
+    description: |
+      聚合多个位图的交集 (AND)。
+      
+      注意:该函数的回撤版本在处理大位图时可能有显著的性能开销。
+      
+      `bitmap BITMAP`
+      
+      返回一个 `BITMAP`。
   - sql: BITMAP_BUILD_AGG(value)
     table: value.bitmapBuildAgg()
     description: |
@@ -1660,6 +1670,24 @@ bitmapagg:
       
       `value INT`
       
+      返回一个 `BITMAP`。
+  - sql: BITMAP_OR_AGG(bitmap)
+    table: bitmap.bitmapOrAgg()
+    description: |
+      聚合多个位图的并集 (OR)。
+      
+      注意:该函数的回撤版本在处理大位图时可能有显著的性能开销。
+      
+      `bitmap BITMAP`
+      
+      返回一个 `BITMAP`。
+  - sql: BITMAP_XOR_AGG(bitmap)
+    table: bitmap.bitmapXorAgg()
+    description: |
+      聚合多个位图的异或 (XOR)。
+      
+      `bitmap BITMAP`
+      
       返回一个 `BITMAP`。
 
 catalog:
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 727f5e979b0..c54edf08ca2 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -347,12 +347,15 @@ Bitmap functions
 
     Expression.bitmap_and
     Expression.bitmap_andnot
+    Expression.bitmap_and_agg
     Expression.bitmap_build
     Expression.bitmap_build_agg
     Expression.bitmap_cardinality
     Expression.bitmap_from_bytes
     Expression.bitmap_or
+    Expression.bitmap_or_agg
     Expression.bitmap_to_array
     Expression.bitmap_to_bytes
     Expression.bitmap_to_string
     Expression.bitmap_xor
+    Expression.bitmap_xor_agg
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 9241bbadf55..ff3628f0716 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -2299,6 +2299,17 @@ class Expression(Generic[T]):
         """
         return _binary_op("bitmapAndnot")(self, bitmap2)
 
+    def bitmap_and_agg(self):
+        """
+        Aggregates the AND (intersection) of multiple bitmaps.
+
+        NOTE: The retraction variant of this function may have significant 
performance overhead
+        with large bitmaps.
+
+        :return: a BITMAP expression
+        """
+        return _unary_op("bitmapAndAgg")(self)
+
     def bitmap_build(self) -> 'Expression':
         """
         Creates a bitmap from an array of 32-bit integers.
@@ -2351,6 +2362,17 @@ class Expression(Generic[T]):
         """
         return _binary_op("bitmapOr")(self, bitmap2)
 
+    def bitmap_or_agg(self):
+        """
+        Aggregates the OR (union) of multiple bitmaps.
+
+        NOTE: The retraction variant of this function may have significant 
performance overhead
+        with large bitmaps.
+
+        :return: a BITMAP expression
+        """
+        return _unary_op("bitmapOrAgg")(self)
+
     def bitmap_to_array(self) -> 'Expression':
         """
         Converts a bitmap to an array of 32-bit integers, the values are 
sorted by \
@@ -2403,6 +2425,14 @@ class Expression(Generic[T]):
         """
         return _binary_op("bitmapXor")(self, bitmap2)
 
+    def bitmap_xor_agg(self):
+        """
+        Aggregates the XOR (symmetric difference) of multiple bitmaps.
+
+        :return: a BITMAP expression
+        """
+        return _unary_op("bitmapXorAgg")(self)
+
 
 # add the docs
 _make_math_log_doc()
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index f6d10fc99c5..74329037bd0 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -268,15 +268,18 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         # bitmap functions
         self.assertEqual("BITMAP_AND(a, b)", str(expr1.bitmap_and(expr2)))
         self.assertEqual("BITMAP_ANDNOT(a, b)", 
str(expr1.bitmap_andnot(expr2)))
+        self.assertEqual("BITMAP_AND_AGG(a)", str(expr1.bitmap_and_agg()))
         self.assertEqual("BITMAP_BUILD(a)", str(expr1.bitmap_build()))
         self.assertEqual("BITMAP_BUILD_AGG(a)", str(expr1.bitmap_build_agg()))
         self.assertEqual("BITMAP_CARDINALITY(a)", 
str(expr1.bitmap_cardinality()))
         self.assertEqual("BITMAP_FROM_BYTES(a)", 
str(expr1.bitmap_from_bytes()))
         self.assertEqual("BITMAP_OR(a, b)", str(expr1.bitmap_or(expr2)))
+        self.assertEqual("BITMAP_OR_AGG(a)", str(expr1.bitmap_or_agg()))
         self.assertEqual("BITMAP_TO_ARRAY(a)", str(expr1.bitmap_to_array()))
         self.assertEqual("BITMAP_TO_BYTES(a)", str(expr1.bitmap_to_bytes()))
         self.assertEqual("BITMAP_TO_STRING(a)", str(expr1.bitmap_to_string()))
         self.assertEqual("BITMAP_XOR(a, b)", str(expr1.bitmap_xor(expr2)))
+        self.assertEqual("BITMAP_XOR_AGG(a)", str(expr1.bitmap_xor_agg()))
 
     def test_expressions(self):
         expr1 = col('a')
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 086ef39867e..90735d6f7fc 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -78,15 +78,18 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BETWEE
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_ANDNOT;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND_AGG;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD_AGG;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_CARDINALITY;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_FROM_BYTES;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR_AGG;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_ARRAY;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_BYTES;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_STRING;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR_AGG;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BTRIM;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.CARDINALITY;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
@@ -2641,6 +2644,18 @@ public abstract class BaseExpressions<InType, OutType> {
                 unresolvedCall(BITMAP_ANDNOT, toExpr(), 
objectToExpression(bitmap2)));
     }
 
+    /**
+     * Aggregates the AND (intersection) of multiple bitmaps.
+     *
+     * <p>NOTE: The retraction variant of this function may have significant 
performance overhead
+     * with large bitmaps.
+     *
+     * @return a BITMAP expression
+     */
+    public OutType bitmapAndAgg() {
+        return toApiSpecificExpression(unresolvedCall(BITMAP_AND_AGG, 
toExpr()));
+    }
+
     /**
      * Creates a bitmap from an array of 32-bit integers.
      *
@@ -2700,6 +2715,18 @@ public abstract class BaseExpressions<InType, OutType> {
                 unresolvedCall(BITMAP_OR, toExpr(), 
objectToExpression(bitmap2)));
     }
 
+    /**
+     * Aggregates the OR (union) of multiple bitmaps.
+     *
+     * <p>NOTE: The retraction variant of this function may have significant 
performance overhead
+     * with large bitmaps.
+     *
+     * @return a BITMAP expression
+     */
+    public OutType bitmapOrAgg() {
+        return toApiSpecificExpression(unresolvedCall(BITMAP_OR_AGG, 
toExpr()));
+    }
+
     /**
      * Converts a bitmap to an array of 32-bit integers, the values are sorted 
by {@link
      * Integer#compareUnsigned}.
@@ -2759,4 +2786,13 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(
                 unresolvedCall(BITMAP_XOR, toExpr(), 
objectToExpression(bitmap2)));
     }
+
+    /**
+     * Aggregates the XOR (symmetric difference) of multiple bitmaps.
+     *
+     * @return a BITMAP expression
+     */
+    public OutType bitmapXorAgg() {
+        return toApiSpecificExpression(unresolvedCall(BITMAP_XOR_AGG, 
toExpr()));
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 44c40421e30..f20f871baf0 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -3015,6 +3015,18 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.BitmapAndnotFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition BITMAP_AND_AGG =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("BITMAP_AND_AGG")
+                    .kind(AGGREGATE)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Collections.singletonList("bitmap"),
+                                    
Collections.singletonList(logical(LogicalTypeRoot.BITMAP))))
+                    .outputTypeStrategy(explicit(DataTypes.BITMAP()))
+                    .runtimeProvided()
+                    .build();
+
     public static final BuiltInFunctionDefinition BITMAP_BUILD =
             BuiltInFunctionDefinition.newBuilder()
                     .name("BITMAP_BUILD")
@@ -3091,6 +3103,18 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.BitmapOrFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition BITMAP_OR_AGG =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("BITMAP_OR_AGG")
+                    .kind(AGGREGATE)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Collections.singletonList("bitmap"),
+                                    
Collections.singletonList(logical(LogicalTypeRoot.BITMAP))))
+                    .outputTypeStrategy(explicit(DataTypes.BITMAP()))
+                    .runtimeProvided()
+                    .build();
+
     public static final BuiltInFunctionDefinition BITMAP_TO_ARRAY =
             BuiltInFunctionDefinition.newBuilder()
                     .name("BITMAP_TO_ARRAY")
@@ -3145,6 +3169,18 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.BitmapXorFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition BITMAP_XOR_AGG =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("BITMAP_XOR_AGG")
+                    .kind(AGGREGATE)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Collections.singletonList("bitmap"),
+                                    
Collections.singletonList(logical(LogicalTypeRoot.BITMAP))))
+                    .outputTypeStrategy(explicit(DataTypes.BITMAP()))
+                    .runtimeProvided()
+                    .build();
+
     // 
--------------------------------------------------------------------------------------------
     // Other functions
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 3eecd384abf..20bedac6c13 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -176,6 +176,12 @@ class AggFunctionFactory(
             createPercentileAggFunction(argTypes)
           case BuiltInFunctionDefinitions.BITMAP_BUILD_AGG =>
             createBitmapBuildAggFunction(argTypes, index)
+          case BuiltInFunctionDefinitions.BITMAP_AND_AGG =>
+            createBitmapAndAggFunction(argTypes, index)
+          case BuiltInFunctionDefinitions.BITMAP_OR_AGG =>
+            createBitmapOrAggFunction(argTypes, index)
+          case BuiltInFunctionDefinitions.BITMAP_XOR_AGG =>
+            createBitmapXorAggFunction(argTypes, index)
           // DeclarativeAggregateFunction & UDF
           case _ =>
             bridge.getDefinition.asInstanceOf[UserDefinedFunction]
@@ -661,4 +667,34 @@ class AggFunctionFactory(
       new BitmapBuildAggFunction(argTypes(0))
     }
   }
+
+  private def createBitmapAndAggFunction(
+      argTypes: Array[LogicalType],
+      index: Int): UserDefinedFunction = {
+    if (aggCallNeedRetractions(index)) {
+      new BitmapAndWithRetractAggFunction(argTypes(0))
+    } else {
+      new BitmapAndAggFunction(argTypes(0))
+    }
+  }
+
+  private def createBitmapOrAggFunction(
+      argTypes: Array[LogicalType],
+      index: Int): UserDefinedFunction = {
+    if (aggCallNeedRetractions(index)) {
+      new BitmapOrWithRetractAggFunction(argTypes(0))
+    } else {
+      new BitmapOrAggFunction(argTypes(0))
+    }
+  }
+
+  private def createBitmapXorAggFunction(
+      argTypes: Array[LogicalType],
+      index: Int): UserDefinedFunction = {
+    if (aggCallNeedRetractions(index)) {
+      new BitmapXorWithRetractAggFunction(argTypes(0))
+    } else {
+      new BitmapXorAggFunction(argTypes(0))
+    }
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java
index 8312da400e3..cbd79bfea2a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java
@@ -28,12 +28,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Stream;
 
+import static org.apache.flink.table.api.DataTypes.ARRAY;
 import static org.apache.flink.table.api.DataTypes.BIGINT;
 import static org.apache.flink.table.api.DataTypes.BITMAP;
 import static org.apache.flink.table.api.DataTypes.INT;
 import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.array;
 import static org.apache.flink.types.RowKind.DELETE;
 import static org.apache.flink.types.RowKind.INSERT;
 import static org.apache.flink.types.RowKind.UPDATE_AFTER;
@@ -45,10 +47,109 @@ class BitmapAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
     @Override
     Stream<TestSpec> getTestCaseSpecs() {
         final List<TestSpec> specs = new ArrayList<>();
+        specs.addAll(bitmapAndAggTestCases());
         specs.addAll(bitmapBuildAggTestCases());
+        specs.addAll(bitmapOrAggTestCases());
+        specs.addAll(bitmapXorAggTestCases());
         return specs.stream();
     }
 
+    private List<TestSpec> bitmapAndAggTestCases() {
+        return Arrays.asList(
+                TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG)
+                        .withDescription("without retraction")
+                        .withSource(
+                                ROW(BITMAP(), STRING()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, fromArray(1, 2, 3), 
"A"),
+                                        Row.ofKind(INSERT, fromArray(2, 3, 4), 
"A"),
+                                        Row.ofKind(INSERT, fromArray(1, 3, 5), 
"A"),
+                                        Row.ofKind(INSERT, null, "A"),
+                                        Row.ofKind(INSERT, fromArray(2, 4, 6), 
"B"),
+                                        Row.ofKind(INSERT, fromArray(1, 2, 4, 
6), "B"),
+                                        Row.ofKind(INSERT, fromArray(4, 6, 8, 
12, 16), "B"),
+                                        Row.ofKind(INSERT, fromArray(-1, 0, 
1), "C"),
+                                        Row.ofKind(INSERT, fromArray(-1, -2), 
"C"),
+                                        Row.ofKind(INSERT, null, "C"),
+                                        Row.ofKind(INSERT, null, "D")))
+                        .testResult(
+                                source ->
+                                        "SELECT f1, BITMAP_AND_AGG(f0) FROM "
+                                                + source
+                                                + " GROUP BY f1",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f1")),
+                                        $("f1"),
+                                        $("f0").bitmapAndAgg()),
+                                ROW(STRING(), BITMAP()),
+                                ROW(STRING(), BITMAP()),
+                                Arrays.asList(
+                                        Row.of("A", fromArray(3)),
+                                        Row.of("B", fromArray(4, 6)),
+                                        Row.of("C", fromArray(-1)),
+                                        Row.of("D", null))),
+                TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG)
+                        .withDescription("with retraction")
+                        .withSource(
+                                ROW(BITMAP(), STRING()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, fromArray(2, 4, 6), 
"A"),
+                                        Row.ofKind(INSERT, null, "A"),
+                                        Row.ofKind(DELETE, fromArray(2, 4, 6), 
"A"),
+                                        Row.ofKind(INSERT, fromArray(1, 3, 5), 
"B"),
+                                        Row.ofKind(DELETE, fromArray(1, 3, 5), 
"B"),
+                                        Row.ofKind(INSERT, null, "B"),
+                                        Row.ofKind(INSERT, fromArray(-1, 0, 2, 
3, 4), "B"),
+                                        Row.ofKind(DELETE, fromArray(2, 4, 6), 
"B"), // count < 0
+                                        Row.ofKind(INSERT, fromArray(2, 3, 4, 
5, 6), "B"),
+                                        Row.ofKind(INSERT, fromArray(2, 4, 6), 
"B"),
+                                        Row.ofKind(INSERT, fromArray(2, 4, 6), 
"B"),
+                                        Row.ofKind(INSERT, fromArray(1, 4, 7), 
"B"),
+                                        Row.ofKind(DELETE, fromArray(1, 4, 7), 
"B"),
+                                        Row.ofKind(UPDATE_BEFORE, fromArray(2, 
4, 6), "B"),
+                                        Row.ofKind(UPDATE_AFTER, fromArray(3, 
4, 5), "B"),
+                                        Row.ofKind(INSERT, fromArray(2, 3, 
11), "C"),
+                                        Row.ofKind(INSERT, fromArray(1, 5, 
13), "C"),
+                                        Row.ofKind(INSERT, fromArray(-1, -3, 
0), "C"),
+                                        Row.ofKind(INSERT, null, "C"),
+                                        Row.ofKind(DELETE, fromArray(-1, -3, 
0), "C"),
+                                        Row.ofKind(DELETE, fromArray(1, 5, 
13), "C"),
+                                        Row.ofKind(DELETE, null, "C"),
+                                        Row.ofKind(UPDATE_BEFORE, fromArray(2, 
3, 11), "C"),
+                                        Row.ofKind(UPDATE_AFTER, fromArray(1, 
2), "C")))
+                        .testResult(
+                                source ->
+                                        "SELECT f1, BITMAP_AND_AGG(f0) FROM "
+                                                + source
+                                                + " GROUP BY f1",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f1")),
+                                        $("f1"),
+                                        $("f0").bitmapAndAgg()),
+                                ROW(STRING(), BITMAP()),
+                                ROW(STRING(), BITMAP()),
+                                Arrays.asList(
+                                        Row.of("A", null),
+                                        Row.of("B", fromArray(3, 4)),
+                                        Row.of("C", fromArray(1, 2)))),
+                TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG)
+                        .withDescription("Validation Error")
+                        .withSource(
+                                ROW(INT(), ARRAY(INT()), STRING()),
+                                Collections.singletonList(Row.ofKind(INSERT, 
1, array(1, 2), "A")))
+                        .testValidationError(
+                                source ->
+                                        "SELECT f2, BITMAP_AND_AGG(f0) FROM "
+                                                + source
+                                                + " GROUP BY f2",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f2")),
+                                        $("f2"),
+                                        $("f1").bitmapAndAgg()),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "BITMAP_AND_AGG(bitmap <BITMAP>)"));
+    }
+
     private List<TestSpec> bitmapBuildAggTestCases() {
         return Arrays.asList(
                 
TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG)
@@ -79,8 +180,8 @@ class BitmapAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
                                 ROW(STRING(), BITMAP()),
                                 ROW(STRING(), BITMAP()),
                                 Arrays.asList(
-                                        Row.of("A", Bitmap.fromArray(new int[] 
{-1, 1, 2, 3, 4})),
-                                        Row.of("B", Bitmap.fromArray(new int[] 
{-1, 1, 2})),
+                                        Row.of("A", fromArray(-1, 1, 2, 3, 4)),
+                                        Row.of("B", fromArray(-1, 1, 2)),
                                         Row.of("C", null))),
                 
TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG)
                         .withDescription("with retraction")
@@ -124,8 +225,8 @@ class BitmapAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
                                 ROW(STRING(), BITMAP()),
                                 Arrays.asList(
                                         Row.of("A", null),
-                                        Row.of("B", Bitmap.fromArray(new int[] 
{-1, 1, 2})),
-                                        Row.of("C", Bitmap.fromArray(new int[] 
{1})))),
+                                        Row.of("B", fromArray(-1, 1, 2)),
+                                        Row.of("C", fromArray(1)))),
                 
TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG)
                         .withDescription("Validation Error")
                         .withSource(
@@ -143,4 +244,205 @@ class BitmapAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
                                 "Invalid input arguments. Expected signatures 
are:\n"
                                         + "BITMAP_BUILD_AGG(value 
<INTEGER>)"));
     }
+
+    private List<TestSpec> bitmapOrAggTestCases() {
+        return Arrays.asList(
+                TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG)
+                        .withDescription("without retraction")
+                        .withSource(
+                                ROW(BITMAP(), STRING()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, fromArray(1, 2, 3), 
"A"),
+                                        Row.ofKind(INSERT, fromArray(2, 3, 4), 
"A"),
+                                        Row.ofKind(INSERT, fromArray(1, 3, 5), 
"A"),
+                                        Row.ofKind(INSERT, null, "A"),
+                                        Row.ofKind(INSERT, fromArray(2, 4, 6), 
"B"),
+                                        Row.ofKind(INSERT, fromArray(1, 2, 4, 
6), "B"),
+                                        Row.ofKind(INSERT, fromArray(4, 6, 8, 
12, 16), "B"),
+                                        Row.ofKind(INSERT, fromArray(-1, 0, 
1), "C"),
+                                        Row.ofKind(INSERT, fromArray(-1, -2), 
"C"),
+                                        Row.ofKind(INSERT, null, "C"),
+                                        Row.ofKind(INSERT, null, "D")))
+                        .testResult(
+                                source ->
+                                        "SELECT f1, BITMAP_OR_AGG(f0) FROM "
+                                                + source
+                                                + " GROUP BY f1",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f1")),
+                                        $("f1"),
+                                        $("f0").bitmapOrAgg()),
+                                ROW(STRING(), BITMAP()),
+                                ROW(STRING(), BITMAP()),
+                                Arrays.asList(
+                                        Row.of("A", fromArray(1, 2, 3, 4, 5)),
+                                        Row.of(
+                                                "B",
+                                                Bitmap.fromArray(
+                                                        new int[] {1, 2, 4, 6, 
8, 12, 16})),
+                                        Row.of("C", fromArray(0, 1, -2, -1)),
+                                        Row.of("D", null))),
+                TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG)
+                        .withDescription("with retraction")
+                        .withSource(
+                                ROW(BITMAP(), STRING()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, fromArray(1, 2, 3), 
"A"),
+                                        Row.ofKind(INSERT, null, "A"),
+                                        Row.ofKind(DELETE, fromArray(1, 2, 3), 
"A"),
+                                        Row.ofKind(INSERT, fromArray(1, 3, 5), 
"B"),
+                                        Row.ofKind(DELETE, fromArray(1, 3, 5), 
"B"),
+                                        Row.ofKind(INSERT, null, "B"),
+                                        Row.ofKind(INSERT, fromArray(-1, 0, 2, 
3, 4), "B"),
+                                        Row.ofKind(DELETE, fromArray(2, 4, 6), 
"B"), // count < 0
+                                        Row.ofKind(INSERT, fromArray(2, 3, 4, 
5, 6), "B"),
+                                        Row.ofKind(INSERT, fromArray(2, 4, 6), 
"B"),
+                                        Row.ofKind(INSERT, fromArray(2, 4, 6), 
"B"),
+                                        Row.ofKind(INSERT, fromArray(1, 4, 7), 
"B"),
+                                        Row.ofKind(DELETE, fromArray(1, 4, 7), 
"B"),
+                                        Row.ofKind(UPDATE_BEFORE, fromArray(2, 
4, 6), "B"),
+                                        Row.ofKind(UPDATE_AFTER, fromArray(3, 
4, 5), "B"),
+                                        Row.ofKind(INSERT, fromArray(2, 3, 
11), "C"),
+                                        Row.ofKind(INSERT, fromArray(1, 5, 
13), "C"),
+                                        Row.ofKind(INSERT, fromArray(-1, -3, 
0), "C"),
+                                        Row.ofKind(INSERT, null, "C"),
+                                        Row.ofKind(DELETE, fromArray(-1, -3, 
0), "C"),
+                                        Row.ofKind(DELETE, fromArray(1, 5, 
13), "C"),
+                                        Row.ofKind(DELETE, null, "C"),
+                                        Row.ofKind(UPDATE_BEFORE, fromArray(2, 
3, 11), "C"),
+                                        Row.ofKind(UPDATE_AFTER, fromArray(1, 
2), "C")))
+                        .testResult(
+                                source ->
+                                        "SELECT f1, BITMAP_OR_AGG(f0) FROM "
+                                                + source
+                                                + " GROUP BY f1",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f1")),
+                                        $("f1"),
+                                        $("f0").bitmapOrAgg()),
+                                ROW(STRING(), BITMAP()),
+                                ROW(STRING(), BITMAP()),
+                                Arrays.asList(
+                                        Row.of("A", null),
+                                        Row.of("B", fromArray(0, 2, 3, 4, 5, 
6, -1)),
+                                        Row.of("C", fromArray(1, 2)))),
+                TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG)
+                        .withDescription("Validation Error")
+                        .withSource(
+                                ROW(INT(), ARRAY(INT()), STRING()),
+                                Collections.singletonList(Row.ofKind(INSERT, 
1, array(1, 2), "A")))
+                        .testValidationError(
+                                source ->
+                                        "SELECT f2, BITMAP_OR_AGG(f0) FROM "
+                                                + source
+                                                + " GROUP BY f2",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f2")),
+                                        $("f2"),
+                                        $("f1").bitmapOrAgg()),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "BITMAP_OR_AGG(bitmap <BITMAP>)"));
+    }
+
+    private List<TestSpec> bitmapXorAggTestCases() {
+        return Arrays.asList(
+                TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG)
+                        .withDescription("without retraction")
+                        .withSource(
+                                ROW(BITMAP(), STRING()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, fromArray(1, 2, 3), 
"A"),
+                                        Row.ofKind(INSERT, fromArray(2, 3, 4), 
"A"),
+                                        Row.ofKind(INSERT, fromArray(1, 3, 5), 
"A"),
+                                        Row.ofKind(INSERT, null, "A"),
+                                        Row.ofKind(INSERT, fromArray(2, 4, 6), 
"B"),
+                                        Row.ofKind(INSERT, fromArray(1, 2, 4, 
6), "B"),
+                                        Row.ofKind(INSERT, fromArray(4, 6, 8, 
12, 16), "B"),
+                                        Row.ofKind(INSERT, fromArray(-1, 0, 
1), "C"),
+                                        Row.ofKind(INSERT, fromArray(-1, -2), 
"C"),
+                                        Row.ofKind(INSERT, null, "C"),
+                                        Row.ofKind(INSERT, null, "D")))
+                        .testResult(
+                                source ->
+                                        "SELECT f1, BITMAP_XOR_AGG(f0) FROM "
+                                                + source
+                                                + " GROUP BY f1",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f1")),
+                                        $("f1"),
+                                        $("f0").bitmapXorAgg()),
+                                ROW(STRING(), BITMAP()),
+                                ROW(STRING(), BITMAP()),
+                                Arrays.asList(
+                                        Row.of("A", fromArray(3, 4, 5)),
+                                        Row.of("B", fromArray(1, 4, 6, 8, 12, 
16)),
+                                        Row.of("C", fromArray(0, 1, -2)),
+                                        Row.of("D", null))),
+                TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG)
+                        .withDescription("with retraction")
+                        .withSource(
+                                ROW(BITMAP(), STRING()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, fromArray(1, 2, 3), 
"A"),
+                                        Row.ofKind(INSERT, null, "A"),
+                                        Row.ofKind(DELETE, fromArray(1, 2, 3), 
"A"),
+                                        Row.ofKind(INSERT, fromArray(1, 3, 5), 
"B"),
+                                        Row.ofKind(DELETE, fromArray(1, 3, 5), 
"B"),
+                                        Row.ofKind(INSERT, null, "B"),
+                                        Row.ofKind(INSERT, fromArray(-1, 0, 2, 
3, 4), "B"),
+                                        Row.ofKind(DELETE, fromArray(2, 4, 6), 
"B"), // count < 0
+                                        Row.ofKind(INSERT, fromArray(2, 3, 4, 
5, 6), "B"),
+                                        Row.ofKind(INSERT, fromArray(2, 4, 6), 
"B"),
+                                        Row.ofKind(INSERT, fromArray(2, 4, 6), 
"B"),
+                                        Row.ofKind(INSERT, fromArray(1, 4, 7), 
"B"),
+                                        Row.ofKind(DELETE, fromArray(1, 4, 7), 
"B"),
+                                        Row.ofKind(UPDATE_BEFORE, fromArray(2, 
4, 6), "B"),
+                                        Row.ofKind(UPDATE_AFTER, fromArray(3, 
4, 5), "B"),
+                                        Row.ofKind(INSERT, fromArray(2, 3, 
11), "C"),
+                                        Row.ofKind(INSERT, fromArray(1, 5, 
13), "C"),
+                                        Row.ofKind(INSERT, fromArray(-1, -3, 
0), "C"),
+                                        Row.ofKind(INSERT, null, "C"),
+                                        Row.ofKind(DELETE, fromArray(-1, -3, 
0), "C"),
+                                        Row.ofKind(DELETE, fromArray(1, 5, 
13), "C"),
+                                        Row.ofKind(DELETE, null, "C"),
+                                        Row.ofKind(UPDATE_BEFORE, fromArray(2, 
3, 11), "C"),
+                                        Row.ofKind(UPDATE_AFTER, fromArray(1, 
2), "C")))
+                        .testResult(
+                                source ->
+                                        "SELECT f1, BITMAP_XOR_AGG(f0) FROM "
+                                                + source
+                                                + " GROUP BY f1",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f1")),
+                                        $("f1"),
+                                        $("f0").bitmapXorAgg()),
+                                ROW(STRING(), BITMAP()),
+                                ROW(STRING(), BITMAP()),
+                                Arrays.asList(
+                                        Row.of("A", null),
+                                        Row.of("B", fromArray(0, 3, 4, 6, -1)),
+                                        Row.of("C", fromArray(1, 2)))),
+                TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG)
+                        .withDescription("Validation Error")
+                        .withSource(
+                                ROW(INT(), ARRAY(INT()), STRING()),
+                                Collections.singletonList(Row.ofKind(INSERT, 
1, array(1, 2), "A")))
+                        .testValidationError(
+                                source ->
+                                        "SELECT f2, BITMAP_XOR_AGG(f0) FROM "
+                                                + source
+                                                + " GROUP BY f2",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f2")),
+                                        $("f2"),
+                                        $("f1").bitmapXorAgg()),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "BITMAP_XOR_AGG(bitmap <BITMAP>)"));
+    }
+
+    // ~ Utils 
--------------------------------------------------------------------
+
+    private Bitmap fromArray(int... values) {
+        return Bitmap.fromArray(values);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
index c9ca677c07a..69040c961d8 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
@@ -3004,6 +3004,36 @@ class OverAggregateITCase extends BatchTestBase {
       )
     )
   }
+
+  @Test
+  def testBitmapLogicalOpsAgg(): Unit = {
+    checkResult(
+      "SELECT " +
+        "d, f, " +
+        "BITMAP_AND_AGG(BITMAP_BUILD(ARRAY[d,f])) OVER (ORDER BY e ROWS 
BETWEEN 2 PRECEDING AND CURRENT ROW), " +
+        "BITMAP_OR_AGG(BITMAP_BUILD(ARRAY[d,f])) OVER (ORDER BY e ROWS BETWEEN 
2 PRECEDING AND CURRENT ROW), " +
+        "BITMAP_XOR_AGG(BITMAP_BUILD(ARRAY[d,f])) OVER (ORDER BY e ROWS 
BETWEEN 2 PRECEDING AND CURRENT ROW) " +
+        "FROM Table5",
+      Seq(
+        row(1, 0, "{0,1}", "{0,1}", "{0,1}"),
+        row(2, 1, "{1}", "{0,1,2}", "{0,2}"),
+        row(2, 2, "{}", "{0,1,2}", "{0}"),
+        row(3, 3, "{}", "{1,2,3}", "{1,3}"),
+        row(3, 4, "{}", "{2,3,4}", "{2,4}"),
+        row(3, 5, "{3}", "{3,4,5}", "{3,4,5}"),
+        row(4, 6, "{}", "{3,4,5,6}", "{5,6}"),
+        row(4, 7, "{}", "{3,4,5,6,7}", "{3,5,6,7}"),
+        row(4, 8, "{4}", "{4,6,7,8}", "{4,6,7,8}"),
+        row(4, 9, "{4}", "{4,7,8,9}", "{4,7,8,9}"),
+        row(5, 10, "{}", "{4,5,8,9,10}", "{5,8,9,10}"),
+        row(5, 11, "{}", "{4,5,9,10,11}", "{4,9,10,11}"),
+        row(5, 12, "{5}", "{5,10,11,12}", "{5,10,11,12}"),
+        row(5, 13, "{5}", "{5,11,12,13}", "{5,11,12,13}"),
+        row(5, 14, "{5}", "{5,12,13,14}", "{5,12,13,14}")
+      )
+    )
+  }
+
 }
 
 /** The initial accumulator for count aggregate function */
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index cd2adf0154a..fbee1a8585f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -2190,6 +2190,88 @@ class AggregateITCase(
     val expected = List(s"{4,5,6},{4,8,12}")
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
+
+  @TestTemplate
+  def testBitmapLogicalOpsAgg(): Unit = {
+    val data = new mutable.MutableList[(Int, Int, Int, String)]
+    data.+=((-3, 5, 0, "a"))
+    data.+=((7, 2, 5, "b"))
+    data.+=((-3, 8, -8, "c"))
+    data.+=((2, 1, 7, "b"))
+    data.+=((2, 9, 0, "a"))
+    data.+=((8, 3, -3, "c"))
+    data.+=((7, 6, 2, "b"))
+    data.+=((0, 4, 5, "a"))
+    data.+=((-3, 7, 8, "c"))
+    data.+=((5, 0, 2, "b"))
+    data.+=((0, 10, 5, "a"))
+    data.+=((2, 5, 0, "a"))
+
+    val sql =
+      """
+        |SELECT
+        |  d,
+        |  CAST(BITMAP_AND_AGG(BITMAP_BUILD(ARRAY[a, b, c])) AS STRING),
+        |  CAST(BITMAP_OR_AGG(BITMAP_BUILD(ARRAY[a, b, c])) AS STRING),
+        |  CAST(BITMAP_XOR_AGG(BITMAP_BUILD(ARRAY[a, b, c])) AS STRING)
+        |FROM MyTable
+        |GROUP BY d
+      """.stripMargin
+
+    val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd)
+    tEnv.createTemporaryView("MyTable", t)
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected =
+      List(
+        
s"a,{0},{0,2,4,5,9,10,${Integer.toUnsignedLong(-3)}},{0,4,9,10,${Integer.toUnsignedLong(-3)}}",
+        "b,{2},{0,1,2,5,6,7},{0,1,6,7}",
+        
s"c,{8,${Integer.toUnsignedLong(-3)}},{3,7,8,${Integer.toUnsignedLong(-8)},${Integer
+            
.toUnsignedLong(-3)}},{3,7,8,${Integer.toUnsignedLong(-8)},${Integer
+            .toUnsignedLong(-3)}}"
+      )
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testBitmapLogicalOpsAggWithRetract(): Unit = {
+    val data = new mutable.MutableList[(Int, Int, Int, String)]
+    for (i <- 3 until 0 by -1) {
+      data.+=((i, i + 1, i + 2, "a"))
+      data.+=((i * 2, i * 2 + 1, i * 2 + 2, "b"))
+      data.+=((i * 3, i * 3 + 1, i * 3 + 2, "c"))
+    }
+
+    val inner =
+      """
+        |SELECT
+        |  LAST_VALUE(BITMAP_BUILD(ARRAY[a, b, c])) AS last,
+        |  d
+        |FROM MyTable
+        |GROUP BY d
+      """.stripMargin
+    val sql =
+      s"""
+         |SELECT
+         |  CAST(BITMAP_AND_AGG(last) AS STRING),
+         |  CAST(BITMAP_OR_AGG(last) AS STRING),
+         |  CAST(BITMAP_XOR_AGG(last) AS STRING)
+         |FROM ($inner)
+      """.stripMargin
+
+    val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd)
+    tEnv.createTemporaryView("MyTable", t)
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = List(s"{3},{1,2,3,4,5},{1,3,5}")
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+  }
 }
 
 object AggregateITCase {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
index c4deb3500a9..a1f60de7746 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
@@ -1538,6 +1538,46 @@ class OverAggregateITCase(mode: StateBackendMode, 
unboundedOverVersion: Int)
     )
     assertThat(sink.getAppendResults).isEqualTo(expected)
   }
+
+  @TestTemplate
+  def testBitmapLogicalOpsAgg(): Unit = {
+    val sql =
+      """
+        |SELECT
+        |  a, c,
+        |  BITMAP_AND_AGG(BITMAP_BUILD(ARRAY[a,c])) OVER (ORDER BY proctime 
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW),
+        |  BITMAP_OR_AGG(BITMAP_BUILD(ARRAY[a,c])) OVER (ORDER BY proctime 
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW),
+        |  BITMAP_XOR_AGG(BITMAP_BUILD(ARRAY[a,c])) OVER (ORDER BY proctime 
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
+        |FROM MyTable
+      """.stripMargin
+
+    val t =
+      failingDataSource(TestData.tupleData5).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 
'proctime.proctime)
+    tEnv.createTemporaryView("MyTable", t)
+
+    val sink = new TestingAppendSink()
+    tEnv.sqlQuery(sql).toDataStream.addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = List(
+      "1,0,{0,1},{0,1},{0,1}",
+      "2,1,{1},{0,1,2},{0,2}",
+      "2,2,{},{0,1,2},{0}",
+      "3,3,{},{1,2,3},{1,3}",
+      "3,4,{},{2,3,4},{2,4}",
+      "3,5,{3},{3,4,5},{3,4,5}",
+      "4,6,{},{3,4,5,6},{5,6}",
+      "4,7,{},{3,4,5,6,7},{3,5,6,7}",
+      "4,8,{4},{4,6,7,8},{4,6,7,8}",
+      "4,9,{4},{4,7,8,9},{4,7,8,9}",
+      "5,10,{},{4,5,8,9,10},{5,8,9,10}",
+      "5,11,{},{4,5,9,10,11},{4,9,10,11}",
+      "5,12,{5},{5,10,11,12},{5,10,11,12}",
+      "5,13,{5},{5,11,12,13},{5,11,12,13}",
+      "5,14,{5},{5,12,13,14},{5,12,13,14}"
+    )
+    assertThat(sink.getAppendResults).isEqualTo(expected)
+  }
 }
 
 object OverAggregateITCase {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index 0c498882137..52c8ff9d131 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -1116,6 +1116,34 @@ class WindowAggregateITCase(
         })
       .map(_.mkString(","))
   }
+
+  @TestTemplate
+  def testBitmapLogicalOpsAggOnEventTimeTumbleWindow(): Unit = {
+    val sql =
+      """
+        |SELECT
+        |  window_start,
+        |  window_end,
+        |  BITMAP_AND_AGG(BITMAP_BUILD(ARRAY[`int`, 2*`int`, 4*`int`])),
+        |  BITMAP_OR_AGG(BITMAP_BUILD(ARRAY[`int`, 2*`int`, 4*`int`])),
+        |  BITMAP_XOR_AGG(BITMAP_BUILD(ARRAY[`int`, 2*`int`, 4*`int`]))
+        |FROM TABLE(
+        |   TUMBLE(TABLE T1_CDC, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))
+        |GROUP BY window_start, window_end
+      """.stripMargin
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toDataStream.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      
"2020-10-10T00:00,2020-10-10T00:00:05,{},{2,4,5,8,10,20,22,44,88},{2,4,5,8,10,20,22,44,88}",
+      "2020-10-10T00:00:05,2020-10-10T00:00:10,{6,12},{3,6,12,24},{6,12,24}",
+      "2020-10-10T00:00:15,2020-10-10T00:00:20,{4,8,16},{4,8,16},{4,8,16}"
+    )
+    val result = sink.getAppendResults.sorted
+    assertThat(result.mkString("\n")).isEqualTo(expected.mkString("\n"))
+  }
 }
 
 object WindowAggregateITCase {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
index 29a6e80a2e7..08e27c452cc 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
@@ -635,4 +635,45 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     )
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
+
+  @TestTemplate
+  def testBitmapLogicalOpsAgg(): Unit = {
+    val data = new mutable.MutableList[(Int, Int, Int, String)]
+    data.+=((-3, 5, 0, "a"))
+    data.+=((7, 2, 5, "b"))
+    data.+=((-3, 8, -8, "c"))
+    data.+=((2, 1, 7, "b"))
+    data.+=((2, 9, 0, "a"))
+    data.+=((8, 3, -3, "c"))
+    data.+=((7, 6, 2, "b"))
+    data.+=((0, 4, 5, "a"))
+    data.+=((-3, 7, 8, "c"))
+    data.+=((5, 0, 2, "b"))
+    data.+=((0, 10, 5, "a"))
+    data.+=((2, 5, 0, "a"))
+
+    val t = failingDataSource(data)
+      .toTable(tEnv, 'a, 'b, 'c, 'd)
+      .groupBy('d)
+      .select(
+        'd,
+        array('a, 'b, 
'c).bitmapBuild().bitmapAndAgg().cast(DataTypes.STRING()),
+        array('a, 'b, 'c).bitmapBuild().bitmapOrAgg().cast(DataTypes.STRING()),
+        array('a, 'b, 'c).bitmapBuild().bitmapXorAgg().cast(DataTypes.STRING())
+      )
+
+    val sink = new TestingRetractSink
+    t.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected =
+      List(
+        
s"a,{0},{0,2,4,5,9,10,${Integer.toUnsignedLong(-3)}},{0,4,9,10,${Integer.toUnsignedLong(-3)}}",
+        "b,{2},{0,1,2,5,6,7},{0,1,6,7}",
+        
s"c,{8,${Integer.toUnsignedLong(-3)}},{3,7,8,${Integer.toUnsignedLong(-8)},${Integer
+            
.toUnsignedLong(-3)}},{3,7,8,${Integer.toUnsignedLong(-8)},${Integer
+            .toUnsignedLong(-3)}}"
+      )
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+  }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndAggFunction.java
new file mode 100644
index 00000000000..4c1ad8a33b5
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndAggFunction.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.bitmap.Bitmap;
+import org.apache.flink.types.bitmap.RoaringBitmapData;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in BITMAP_AND_AGG aggregate function. */
+@Internal
+public final class BitmapAndAggFunction
+        extends BuiltInAggregateFunction<Bitmap, 
BitmapAndAggFunction.BitmapAndAccumulator> {
+
+    private final transient DataType valueDataType;
+
+    public BitmapAndAggFunction(LogicalType valueType) {
+        this.valueDataType = toInternalDataType(valueType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Planning
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return Collections.singletonList(valueDataType);
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                BitmapAndAccumulator.class, DataTypes.FIELD("bitmap", 
DataTypes.BITMAP()));
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.BITMAP();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Accumulator
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Accumulator for BITMAP_AND_AGG. */
+    public static class BitmapAndAccumulator {
+
+        public @Nullable RoaringBitmapData bitmap;
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+            BitmapAndAggFunction.BitmapAndAccumulator that =
+                    (BitmapAndAggFunction.BitmapAndAccumulator) obj;
+            return Objects.equals(bitmap, that.bitmap);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(bitmap);
+        }
+    }
+
+    @Override
+    public BitmapAndAccumulator createAccumulator() {
+        return new BitmapAndAccumulator();
+    }
+
+    public void resetAccumulator(BitmapAndAccumulator acc) {
+        acc.bitmap = null;
+    }
+
+    @Override
+    public Bitmap getValue(BitmapAndAccumulator acc) {
+        return Bitmap.from(acc.bitmap);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Runtime
+    // 
--------------------------------------------------------------------------------------------
+
+    public void accumulate(BitmapAndAccumulator acc, @Nullable Bitmap bitmap) {
+        if (bitmap == null) {
+            return;
+        }
+
+        if (acc.bitmap != null) {
+            acc.bitmap.and(bitmap);
+        } else {
+            acc.bitmap = RoaringBitmapData.from(bitmap);
+        }
+    }
+
+    public void merge(BitmapAndAccumulator acc, Iterable<BitmapAndAccumulator> 
its) {
+        for (BitmapAndAccumulator other : its) {
+            if (other.bitmap != null) {
+                if (acc.bitmap != null) {
+                    acc.bitmap.and(other.bitmap);
+                } else {
+                    acc.bitmap = RoaringBitmapData.from(other.bitmap);
+                }
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndWithRetractAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndWithRetractAggFunction.java
new file mode 100644
index 00000000000..dd77b3fb339
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndWithRetractAggFunction.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.dataview.MapView;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.bitmap.Bitmap;
+import org.apache.flink.types.bitmap.RoaringBitmapData;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in BITMAP_AND_AGG with retraction aggregate function. */
+@Internal
+public final class BitmapAndWithRetractAggFunction
+        extends BuiltInAggregateFunction<
+                Bitmap, 
BitmapAndWithRetractAggFunction.BitmapAndWithRetractAccumulator> {
+
+    private final transient DataType valueDataType;
+
+    public BitmapAndWithRetractAggFunction(LogicalType valueType) {
+        this.valueDataType = toInternalDataType(valueType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Planning
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return Collections.singletonList(valueDataType);
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                BitmapAndWithRetractAccumulator.class,
+                DataTypes.FIELD("bitmapCount", DataTypes.INT().notNull()),
+                DataTypes.FIELD(
+                        "valueCount",
+                        MapView.newMapViewDataType(
+                                DataTypes.INT().notNull(), 
DataTypes.INT().notNull())));
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.BITMAP();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Accumulator
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Accumulator for BITMAP_AND_AGG with retraction. */
+    public static class BitmapAndWithRetractAccumulator {
+
+        public int bitmapCount = 0;
+        public MapView<Integer, Integer> valueCount = new MapView<>();
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+            BitmapAndWithRetractAccumulator that = 
(BitmapAndWithRetractAccumulator) obj;
+            return bitmapCount == that.bitmapCount && 
Objects.equals(valueCount, that.valueCount);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(bitmapCount, valueCount);
+        }
+    }
+
+    @Override
+    public BitmapAndWithRetractAccumulator createAccumulator() {
+        return new BitmapAndWithRetractAccumulator();
+    }
+
+    public void resetAccumulator(BitmapAndWithRetractAccumulator acc) {
+        acc.bitmapCount = 0;
+        acc.valueCount.clear();
+    }
+
+    @Override
+    public Bitmap getValue(BitmapAndWithRetractAccumulator acc) {
+        if (acc.bitmapCount <= 0) {
+            return null;
+        }
+
+        RoaringBitmapData bitmap = RoaringBitmapData.empty();
+        try {
+            for (Map.Entry<Integer, Integer> entry : acc.valueCount.entries()) 
{
+                if (entry.getValue() == acc.bitmapCount) {
+                    bitmap.add(entry.getKey());
+                }
+            }
+        } catch (Exception e) {
+            throw new FlinkRuntimeException(e);
+        }
+
+        return bitmap;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Runtime
+    // 
--------------------------------------------------------------------------------------------
+
+    public void accumulate(BitmapAndWithRetractAccumulator acc, @Nullable 
Bitmap bitmap)
+            throws Exception {
+        if (bitmap == null) {
+            return;
+        }
+
+        acc.bitmapCount++;
+
+        RoaringBitmapData.toRoaringBitmapData(bitmap)
+                .forEach(
+                        value -> {
+                            try {
+                                Integer count = acc.valueCount.get(value);
+                                count = count == null ? 1 : count + 1;
+
+                                if (count == 0) {
+                                    acc.valueCount.remove(value);
+                                } else {
+                                    acc.valueCount.put(value, count);
+                                }
+                            } catch (Exception e) {
+                                throw new FlinkRuntimeException(e);
+                            }
+                        });
+    }
+
+    public void retract(BitmapAndWithRetractAccumulator acc, @Nullable Bitmap 
bitmap)
+            throws Exception {
+        if (bitmap == null) {
+            return;
+        }
+
+        acc.bitmapCount--;
+
+        RoaringBitmapData.toRoaringBitmapData(bitmap)
+                .forEach(
+                        value -> {
+                            try {
+                                Integer count = acc.valueCount.get(value);
+                                count = count == null ? -1 : count - 1;
+
+                                if (count == 0) {
+                                    acc.valueCount.remove(value);
+                                } else {
+                                    acc.valueCount.put(value, count);
+                                }
+                            } catch (Exception e) {
+                                throw new FlinkRuntimeException(e);
+                            }
+                        });
+    }
+
+    public void merge(
+            BitmapAndWithRetractAccumulator acc, 
Iterable<BitmapAndWithRetractAccumulator> its)
+            throws Exception {
+        for (BitmapAndWithRetractAccumulator other : its) {
+            acc.bitmapCount += other.bitmapCount;
+
+            for (Map.Entry<Integer, Integer> entry : 
other.valueCount.entries()) {
+                Integer value = entry.getKey();
+                Integer count = entry.getValue();
+
+                Integer curCount = acc.valueCount.get(value);
+                curCount = curCount == null ? count : curCount + count;
+
+                if (curCount == 0) {
+                    acc.valueCount.remove(value);
+                } else {
+                    acc.valueCount.put(value, curCount);
+                }
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrAggFunction.java
new file mode 100644
index 00000000000..19bdd5db219
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrAggFunction.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.bitmap.Bitmap;
+import org.apache.flink.types.bitmap.RoaringBitmapData;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in BITMAP_OR_AGG aggregate function. */
+@Internal
+public final class BitmapOrAggFunction
+        extends BuiltInAggregateFunction<Bitmap, 
BitmapOrAggFunction.BitmapOrAccumulator> {
+
+    private final transient DataType valueDataType;
+
+    public BitmapOrAggFunction(LogicalType valueType) {
+        this.valueDataType = toInternalDataType(valueType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Planning
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return Collections.singletonList(valueDataType);
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                BitmapOrAccumulator.class, DataTypes.FIELD("bitmap", 
DataTypes.BITMAP()));
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.BITMAP();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Accumulator
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Accumulator for BITMAP_OR_AGG. */
+    public static class BitmapOrAccumulator {
+
+        public @Nullable RoaringBitmapData bitmap;
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+            BitmapOrAggFunction.BitmapOrAccumulator that =
+                    (BitmapOrAggFunction.BitmapOrAccumulator) obj;
+            return Objects.equals(bitmap, that.bitmap);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(bitmap);
+        }
+    }
+
+    @Override
+    public BitmapOrAccumulator createAccumulator() {
+        return new BitmapOrAccumulator();
+    }
+
+    public void resetAccumulator(BitmapOrAccumulator acc) {
+        acc.bitmap = null;
+    }
+
+    @Override
+    public Bitmap getValue(BitmapOrAccumulator acc) {
+        return Bitmap.from(acc.bitmap);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Runtime
+    // 
--------------------------------------------------------------------------------------------
+
+    public void accumulate(BitmapOrAccumulator acc, @Nullable Bitmap bitmap) {
+        if (bitmap == null) {
+            return;
+        }
+
+        if (acc.bitmap != null) {
+            acc.bitmap.or(bitmap);
+        } else {
+            acc.bitmap = RoaringBitmapData.from(bitmap);
+        }
+    }
+
+    public void merge(BitmapOrAccumulator acc, Iterable<BitmapOrAccumulator> 
its) {
+        for (BitmapOrAccumulator other : its) {
+            if (other.bitmap != null) {
+                if (acc.bitmap != null) {
+                    acc.bitmap.or(other.bitmap);
+                } else {
+                    acc.bitmap = RoaringBitmapData.from(other.bitmap);
+                }
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrWithRetractAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrWithRetractAggFunction.java
new file mode 100644
index 00000000000..d4299c21511
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrWithRetractAggFunction.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.dataview.MapView;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.bitmap.Bitmap;
+import org.apache.flink.types.bitmap.RoaringBitmapData;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in BITMAP_OR_AGG with retraction aggregate function. */
+@Internal
+public final class BitmapOrWithRetractAggFunction
+        extends BuiltInAggregateFunction<
+                Bitmap, 
BitmapOrWithRetractAggFunction.BitmapOrWithRetractAccumulator> {
+
+    private final transient DataType valueDataType;
+
+    public BitmapOrWithRetractAggFunction(LogicalType valueType) {
+        this.valueDataType = toInternalDataType(valueType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Planning
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return Collections.singletonList(valueDataType);
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                BitmapOrWithRetractAccumulator.class,
+                DataTypes.FIELD("bitmapCount", DataTypes.INT().notNull()),
+                DataTypes.FIELD("bitmap", DataTypes.BITMAP().notNull()),
+                DataTypes.FIELD(
+                        "valueCount",
+                        MapView.newMapViewDataType(
+                                DataTypes.INT().notNull(), 
DataTypes.INT().notNull())));
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.BITMAP();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Accumulator
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Accumulator for BITMAP_OR_AGG with retraction. */
+    public static class BitmapOrWithRetractAccumulator {
+
+        public int bitmapCount = 0;
+        // bitmap should reflect the actual data based on valueCount
+        public RoaringBitmapData bitmap = RoaringBitmapData.empty();
+        public MapView<Integer, Integer> valueCount = new MapView<>();
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+            BitmapOrWithRetractAccumulator that = 
(BitmapOrWithRetractAccumulator) obj;
+            return bitmapCount == that.bitmapCount
+                    && Objects.equals(bitmap, that.bitmap)
+                    && Objects.equals(valueCount, that.valueCount);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(bitmapCount, bitmap, valueCount);
+        }
+    }
+
+    @Override
+    public BitmapOrWithRetractAccumulator createAccumulator() {
+        return new BitmapOrWithRetractAccumulator();
+    }
+
+    public void resetAccumulator(BitmapOrWithRetractAccumulator acc) {
+        acc.bitmapCount = 0;
+        acc.bitmap.clear();
+        acc.valueCount.clear();
+    }
+
+    @Override
+    public Bitmap getValue(BitmapOrWithRetractAccumulator acc) {
+        return acc.bitmapCount <= 0 ? null : 
RoaringBitmapData.from(acc.bitmap);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Runtime
+    // 
--------------------------------------------------------------------------------------------
+
+    public void accumulate(BitmapOrWithRetractAccumulator acc, @Nullable 
Bitmap bitmap)
+            throws Exception {
+        if (bitmap == null) {
+            return;
+        }
+
+        acc.bitmapCount++;
+
+        RoaringBitmapData.toRoaringBitmapData(bitmap)
+                .forEach(
+                        value -> {
+                            try {
+                                Integer count = acc.valueCount.get(value);
+                                count = count == null ? 1 : count + 1;
+
+                                if (count == 0) {
+                                    acc.valueCount.remove(value);
+                                } else {
+                                    acc.valueCount.put(value, count);
+                                    // add value to bitmap if count changes 
from 0 to 1 or expires
+                                    if (count == 1) {
+                                        acc.bitmap.add(value);
+                                    }
+                                }
+                            } catch (Exception e) {
+                                throw new FlinkRuntimeException(e);
+                            }
+                        });
+    }
+
+    public void retract(BitmapOrWithRetractAccumulator acc, @Nullable Bitmap 
bitmap)
+            throws Exception {
+        if (bitmap == null) {
+            return;
+        }
+
+        acc.bitmapCount--;
+
+        RoaringBitmapData.toRoaringBitmapData(bitmap)
+                .forEach(
+                        value -> {
+                            try {
+                                Integer count = acc.valueCount.get(value);
+                                count = count == null ? -1 : count - 1;
+
+                                if (count == 0) {
+                                    acc.valueCount.remove(value);
+                                    // remove value from bitmap if count 
changes from 1 to 0
+                                    acc.bitmap.remove(value);
+                                } else {
+                                    acc.valueCount.put(value, count);
+                                    if (count == -1) {
+                                        // remove value from bitmap if count 
expires
+                                        acc.bitmap.remove(value);
+                                    }
+                                }
+                            } catch (Exception e) {
+                                throw new FlinkRuntimeException(e);
+                            }
+                        });
+    }
+
+    public void merge(
+            BitmapOrWithRetractAccumulator acc, 
Iterable<BitmapOrWithRetractAccumulator> its)
+            throws Exception {
+        for (BitmapOrWithRetractAccumulator other : its) {
+            acc.bitmapCount += other.bitmapCount;
+            for (Map.Entry<Integer, Integer> entry : 
other.valueCount.entries()) {
+                Integer value = entry.getKey();
+                // count != 0
+                Integer count = entry.getValue();
+
+                Integer curCount = acc.valueCount.get(value);
+                curCount = curCount == null ? count : curCount + count;
+
+                if (curCount == 0) {
+                    acc.valueCount.remove(value);
+
+                    if (curCount > count) {
+                        // preCount > 0, value is in bitmap
+                        acc.bitmap.remove(value);
+                    }
+                } else {
+                    acc.valueCount.put(value, curCount);
+
+                    if (0 < curCount && curCount <= count) {
+                        // preCount < 0, value is not in bitmap
+                        // preCount = 0, unknown (expiration)
+                        acc.bitmap.add(value);
+                    }
+
+                    if (0 > curCount && curCount >= count) {
+                        // preCount > 0, value is in bitmap
+                        // preCount = 0, unknown (expiration)
+                        acc.bitmap.remove(value);
+                    }
+                }
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorAggFunction.java
new file mode 100644
index 00000000000..766362f424d
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorAggFunction.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.bitmap.Bitmap;
+import org.apache.flink.types.bitmap.RoaringBitmapData;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in BITMAP_XOR_AGG aggregate function. */
+@Internal
+public final class BitmapXorAggFunction
+        extends BuiltInAggregateFunction<Bitmap, 
BitmapXorAggFunction.BitmapXorAccumulator> {
+
+    private final transient DataType valueDataType;
+
+    public BitmapXorAggFunction(LogicalType valueType) {
+        this.valueDataType = toInternalDataType(valueType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Planning
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return Collections.singletonList(valueDataType);
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                BitmapXorAccumulator.class, DataTypes.FIELD("bitmap", 
DataTypes.BITMAP()));
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.BITMAP();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Accumulator
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Accumulator for BITMAP_XOR_AGG. */
+    public static class BitmapXorAccumulator {
+
+        public @Nullable RoaringBitmapData bitmap;
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+            BitmapXorAggFunction.BitmapXorAccumulator that =
+                    (BitmapXorAggFunction.BitmapXorAccumulator) obj;
+            return Objects.equals(bitmap, that.bitmap);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(bitmap);
+        }
+    }
+
+    @Override
+    public BitmapXorAccumulator createAccumulator() {
+        return new BitmapXorAccumulator();
+    }
+
+    public void resetAccumulator(BitmapXorAccumulator acc) {
+        acc.bitmap = null;
+    }
+
+    @Override
+    public Bitmap getValue(BitmapXorAccumulator acc) {
+        return Bitmap.from(acc.bitmap);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Runtime
+    // 
--------------------------------------------------------------------------------------------
+
+    public void accumulate(BitmapXorAccumulator acc, @Nullable Bitmap bitmap) {
+        if (bitmap == null) {
+            return;
+        }
+
+        if (acc.bitmap != null) {
+            acc.bitmap.xor(bitmap);
+        } else {
+            acc.bitmap = RoaringBitmapData.from(bitmap);
+        }
+    }
+
+    public void merge(BitmapXorAccumulator acc, Iterable<BitmapXorAccumulator> 
its) {
+        for (BitmapXorAccumulator other : its) {
+            if (other.bitmap != null) {
+                if (acc.bitmap != null) {
+                    acc.bitmap.xor(other.bitmap);
+                } else {
+                    acc.bitmap = RoaringBitmapData.from(other.bitmap);
+                }
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorWithRetractAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorWithRetractAggFunction.java
new file mode 100644
index 00000000000..d30e52beebb
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorWithRetractAggFunction.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.bitmap.Bitmap;
+import org.apache.flink.types.bitmap.RoaringBitmapData;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in BITMAP_XOR_AGG with retraction aggregate function. */
+@Internal
+public final class BitmapXorWithRetractAggFunction
+        extends BuiltInAggregateFunction<
+                Bitmap, 
BitmapXorWithRetractAggFunction.BitmapXorWithRetractAccumulator> {
+
+    private final transient DataType valueDataType;
+
+    public BitmapXorWithRetractAggFunction(LogicalType valueType) {
+        this.valueDataType = toInternalDataType(valueType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Planning
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return Collections.singletonList(valueDataType);
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                BitmapXorWithRetractAccumulator.class,
+                DataTypes.FIELD("bitmapCount", DataTypes.INT().notNull()),
+                DataTypes.FIELD("bitmap", DataTypes.BITMAP().notNull()));
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.BITMAP();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Accumulator
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Accumulator for BITMAP_XOR_AGG with retraction. */
+    public static class BitmapXorWithRetractAccumulator {
+
+        public int bitmapCount = 0;
+        public RoaringBitmapData bitmap = RoaringBitmapData.empty();
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+            BitmapXorWithRetractAccumulator that = 
(BitmapXorWithRetractAccumulator) obj;
+            return bitmapCount == that.bitmapCount && Objects.equals(bitmap, 
that.bitmap);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(bitmapCount, bitmap);
+        }
+    }
+
+    @Override
+    public BitmapXorWithRetractAccumulator createAccumulator() {
+        return new BitmapXorWithRetractAccumulator();
+    }
+
+    public void resetAccumulator(BitmapXorWithRetractAccumulator acc) {
+        acc.bitmapCount = 0;
+        acc.bitmap.clear();
+    }
+
+    @Override
+    public Bitmap getValue(BitmapXorWithRetractAccumulator acc) {
+        return acc.bitmapCount <= 0 ? null : 
RoaringBitmapData.from(acc.bitmap);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Runtime
+    // 
--------------------------------------------------------------------------------------------
+
+    public void accumulate(BitmapXorWithRetractAccumulator acc, @Nullable 
Bitmap bitmap) {
+        if (bitmap == null) {
+            return;
+        }
+
+        acc.bitmapCount++;
+        acc.bitmap.xor(bitmap);
+    }
+
+    public void retract(BitmapXorWithRetractAccumulator acc, @Nullable Bitmap 
bitmap) {
+        if (bitmap == null) {
+            return;
+        }
+
+        acc.bitmapCount--;
+        acc.bitmap.xor(bitmap);
+    }
+
+    public void merge(
+            BitmapXorWithRetractAccumulator acc, 
Iterable<BitmapXorWithRetractAccumulator> its) {
+        for (BitmapXorWithRetractAccumulator other : its) {
+            acc.bitmapCount += other.bitmapCount;
+            acc.bitmap.xor(other.bitmap);
+        }
+    }
+}


Reply via email to