This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 62e7163f879389ba5205106f64fd5f6f99e2fb98
Author: dylanhz <[email protected]>
AuthorDate: Tue Mar 24 02:04:46 2026 +0800

    [FLINK-39187][table] Add new built-in functions to allow direct cardinality 
return for bitmap aggregation
---
 docs/data/sql_functions.yml                        |  36 +++++
 docs/data/sql_functions_zh.yml                     |  44 +++++-
 .../docs/reference/pyflink.table/expressions.rst   |   4 +
 flink-python/pyflink/table/expression.py           |  39 ++++++
 .../pyflink/table/tests/test_expression.py         |   5 +
 .../flink/table/api/internal/BaseExpressions.java  |  47 +++++++
 .../functions/BuiltInFunctionDefinitions.java      |  48 +++++++
 .../planner/plan/utils/AggFunctionFactory.scala    |  68 ++++++++--
 .../planner/functions/BitmapAggFunctionITCase.java | 149 ++++++++++++---------
 ...tion.java => AbstractBitmapAndAggFunction.java} |  64 ++++++---
 ...> AbstractBitmapAndWithRetractAggFunction.java} | 110 ++++++++++-----
 ...on.java => AbstractBitmapBuildAggFunction.java} |  59 ++++++--
 ...AbstractBitmapBuildWithRetractAggFunction.java} |  65 ++++++---
 ...ction.java => AbstractBitmapOrAggFunction.java} |  64 ++++++---
 ...=> AbstractBitmapOrWithRetractAggFunction.java} |  65 ++++++---
 ...tion.java => AbstractBitmapXorAggFunction.java} |  64 ++++++---
 ...> AbstractBitmapXorWithRetractAggFunction.java} |  65 ++++++---
 17 files changed, 782 insertions(+), 214 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index f1d6c30a418..7d7bc0768e4 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -1579,6 +1579,16 @@ bitmapagg:
       `bitmap BITMAP`
       
       Returns a `BITMAP`.
+  - sql: BITMAP_AND_CARDINALITY_AGG(bitmap)
+    table: bitmap.bitmapAndCardinalityAgg()
+    description: |
+      Aggregates the AND (intersection) of multiple bitmaps and returns its 
64-bit cardinality.
+      
+      NOTE: The retraction variant of this function may have significant 
performance overhead with large bitmaps.
+      
+      `bitmap BITMAP`
+      
+      Returns a `BIGINT`.
   - sql: BITMAP_BUILD_AGG(value)
     table: value.bitmapBuildAgg()
     description: |
@@ -1587,6 +1597,14 @@ bitmapagg:
       `value INT`
       
       Returns a `BITMAP`.
+  - sql: BITMAP_BUILD_CARDINALITY_AGG(value)
+    table: value.bitmapBuildCardinalityAgg()
+    description: |
+      Aggregates 32-bit integers into a bitmap and returns its 64-bit 
cardinality.
+      
+      `value INT`
+      
+      Returns a `BIGINT`.
   - sql: BITMAP_OR_AGG(bitmap)
     table: bitmap.bitmapOrAgg()
     description: |
@@ -1597,6 +1615,16 @@ bitmapagg:
       `bitmap BITMAP`
       
       Returns a `BITMAP`.
+  - sql: BITMAP_OR_CARDINALITY_AGG(bitmap)
+    table: bitmap.bitmapOrCardinalityAgg()
+    description: |
+      Aggregates the OR (union) of multiple bitmaps and returns its 64-bit 
cardinality.
+      
+      NOTE: The retraction variant of this function may have significant 
performance overhead with large bitmaps.
+      
+      `bitmap BITMAP`
+      
+      Returns a `BIGINT`.
   - sql: BITMAP_XOR_AGG(bitmap)
     table: bitmap.bitmapXorAgg()
     description: |
@@ -1605,6 +1633,14 @@ bitmapagg:
       `bitmap BITMAP`
       
       Returns a `BITMAP`.
+  - sql: BITMAP_XOR_CARDINALITY_AGG(bitmap)
+    table: bitmap.bitmapXorCardinalityAgg()
+    description: |
+      Aggregates the XOR (symmetric difference) of multiple bitmaps and 
returns its 64-bit cardinality.
+      
+      `bitmap BITMAP`
+      
+      Returns a `BIGINT`.
 
 catalog:
   - sql: CURRENT_DATABASE()
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 8278e89677f..539e78168e1 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -1439,7 +1439,7 @@ bitmap:
   - sql: BITMAP_AND(bitmap1, bitmap2)
     table: bitmap1.bitmapAnd(bitmap2)
     description: |
-      计算两个位图的交集 (AND)。
+      计算两个位图的交集(AND)。
             
       `bitmap1 BITMAP, bitmap2 BITMAP`
       
@@ -1447,7 +1447,7 @@ bitmap:
   - sql: BITMAP_ANDNOT(bitmap1, bitmap2)
     table: bitmap1.bitmapAndnot(bitmap2)
     description: |
-      计算两个位图的差集 (AND NOT)。
+      计算两个位图的差集(AND NOT)。
       
       `bitmap1 BITMAP, bitmap2 BITMAP`
       
@@ -1481,7 +1481,7 @@ bitmap:
   - sql: BITMAP_OR(bitmap1, bitmap2)
     table: bitmap1.bitmapOr(bitmap2)
     description: |
-      计算两个位图的并集 (OR)。
+      计算两个位图的并集(OR)。
       
       `bitmap1 BITMAP, bitmap2 BITMAP`
       
@@ -1521,7 +1521,7 @@ bitmap:
   - sql: BITMAP_XOR(bitmap1, bitmap2)
     table: bitmap1.bitmapXor(bitmap2)
     description: |
-      计算两个位图的异或 (XOR)。
+      计算两个位图的异或(XOR)。
       
       `bitmap1 BITMAP, bitmap2 BITMAP`
       
@@ -1663,6 +1663,16 @@ bitmapagg:
       `bitmap BITMAP`
       
       返回一个 `BITMAP`。
+  - sql: BITMAP_AND_CARDINALITY_AGG(bitmap)
+    table: bitmap.bitmapAndCardinalityAgg()
+    description: |
+      聚合多个位图的交集(AND)并返回其 64 位基数。
+      
+      注意:该函数的回撤版本在处理大位图时可能有显著的性能开销。
+      
+      `bitmap BITMAP`
+      
+      返回一个 `BIGINT`。
   - sql: BITMAP_BUILD_AGG(value)
     table: value.bitmapBuildAgg()
     description: |
@@ -1671,6 +1681,14 @@ bitmapagg:
       `value INT`
       
       返回一个 `BITMAP`。
+  - sql: BITMAP_BUILD_CARDINALITY_AGG(value)
+    table: value.bitmapBuildCardinalityAgg()
+    description: |
+      将 32 位整数聚合成位图并返回其 64 位基数。
+      
+      `value INT`
+      
+      返回一个 `BIGINT`。
   - sql: BITMAP_OR_AGG(bitmap)
     table: bitmap.bitmapOrAgg()
     description: |
@@ -1681,6 +1699,16 @@ bitmapagg:
       `bitmap BITMAP`
       
       返回一个 `BITMAP`。
+  - sql: BITMAP_OR_CARDINALITY_AGG(bitmap)
+    table: bitmap.bitmapOrCardinalityAgg()
+    description: |
+      聚合多个位图的并集(OR)并返回其 64 位基数。
+      
+      注意:该函数的回撤版本在处理大位图时可能有显著的性能开销。
+      
+      `bitmap BITMAP`
+      
+      返回一个 `BIGINT`。
   - sql: BITMAP_XOR_AGG(bitmap)
     table: bitmap.bitmapXorAgg()
     description: |
@@ -1689,6 +1717,14 @@ bitmapagg:
       `bitmap BITMAP`
       
       返回一个 `BITMAP`。
+  - sql: BITMAP_XOR_CARDINALITY_AGG(bitmap)
+    table: bitmap.bitmapXorCardinalityAgg()
+    description: |
+      聚合多个位图的异或(XOR)并返回其 64 位基数。
+      
+      `bitmap BITMAP`
+      
+      返回一个 `BIGINT`。
 
 catalog:
   - sql: CURRENT_DATABASE()
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index c54edf08ca2..e2aeb34f411 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -348,14 +348,18 @@ Bitmap functions
     Expression.bitmap_and
     Expression.bitmap_andnot
     Expression.bitmap_and_agg
+    Expression.bitmap_and_cardinality_agg
     Expression.bitmap_build
     Expression.bitmap_build_agg
+    Expression.bitmap_build_cardinality_agg
     Expression.bitmap_cardinality
     Expression.bitmap_from_bytes
     Expression.bitmap_or
     Expression.bitmap_or_agg
+    Expression.bitmap_or_cardinality_agg
     Expression.bitmap_to_array
     Expression.bitmap_to_bytes
     Expression.bitmap_to_string
     Expression.bitmap_xor
     Expression.bitmap_xor_agg
+    Expression.bitmap_xor_cardinality_agg
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index ff3628f0716..04649a64727 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -2310,6 +2310,17 @@ class Expression(Generic[T]):
         """
         return _unary_op("bitmapAndAgg")(self)
 
+    def bitmap_and_cardinality_agg(self):
+        """
+        Aggregates the AND (intersection) of multiple bitmaps and returns its 
64-bit cardinality.
+
+        NOTE: The retraction variant of this function may have significant 
performance overhead
+        with large bitmaps.
+
+        :return: a BIGINT expression
+        """
+        return _unary_op("bitmapAndCardinalityAgg")(self)
+
     def bitmap_build(self) -> 'Expression':
         """
         Creates a bitmap from an array of 32-bit integers.
@@ -2328,6 +2339,14 @@ class Expression(Generic[T]):
         """
         return _unary_op("bitmapBuildAgg")(self)
 
+    def bitmap_build_cardinality_agg(self):
+        """
+        Aggregates 32-bit integers into a bitmap and returns its 64-bit 
cardinality.
+
+        :return: a BIGINT expression
+        """
+        return _unary_op("bitmapBuildCardinalityAgg")(self)
+
     def bitmap_cardinality(self) -> 'Expression':
         """
         Returns the cardinality of a bitmap.
@@ -2373,6 +2392,17 @@ class Expression(Generic[T]):
         """
         return _unary_op("bitmapOrAgg")(self)
 
+    def bitmap_or_cardinality_agg(self):
+        """
+        Aggregates the OR (union) of multiple bitmaps and returns its 64-bit 
cardinality.
+
+        NOTE: The retraction variant of this function may have significant 
performance overhead
+        with large bitmaps.
+
+        :return: a BIGINT expression
+        """
+        return _unary_op("bitmapOrCardinalityAgg")(self)
+
     def bitmap_to_array(self) -> 'Expression':
         """
         Converts a bitmap to an array of 32-bit integers, the values are 
sorted by \
@@ -2433,6 +2463,15 @@ class Expression(Generic[T]):
         """
         return _unary_op("bitmapXorAgg")(self)
 
+    def bitmap_xor_cardinality_agg(self):
+        """
+        Aggregates the XOR (symmetric difference) of multiple bitmaps and 
returns its 64-bit
+        cardinality.
+
+        :return: a BIGINT expression
+        """
+        return _unary_op("bitmapXorCardinalityAgg")(self)
+
 
 # add the docs
 _make_math_log_doc()
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index 74329037bd0..e7a2078d1db 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -269,17 +269,22 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual("BITMAP_AND(a, b)", str(expr1.bitmap_and(expr2)))
         self.assertEqual("BITMAP_ANDNOT(a, b)", 
str(expr1.bitmap_andnot(expr2)))
         self.assertEqual("BITMAP_AND_AGG(a)", str(expr1.bitmap_and_agg()))
+        self.assertEqual("BITMAP_AND_CARDINALITY_AGG(a)", 
str(expr1.bitmap_and_cardinality_agg()))
         self.assertEqual("BITMAP_BUILD(a)", str(expr1.bitmap_build()))
         self.assertEqual("BITMAP_BUILD_AGG(a)", str(expr1.bitmap_build_agg()))
+        self.assertEqual("BITMAP_BUILD_CARDINALITY_AGG(a)",
+                         str(expr1.bitmap_build_cardinality_agg()))
         self.assertEqual("BITMAP_CARDINALITY(a)", 
str(expr1.bitmap_cardinality()))
         self.assertEqual("BITMAP_FROM_BYTES(a)", 
str(expr1.bitmap_from_bytes()))
         self.assertEqual("BITMAP_OR(a, b)", str(expr1.bitmap_or(expr2)))
         self.assertEqual("BITMAP_OR_AGG(a)", str(expr1.bitmap_or_agg()))
+        self.assertEqual("BITMAP_OR_CARDINALITY_AGG(a)", 
str(expr1.bitmap_or_cardinality_agg()))
         self.assertEqual("BITMAP_TO_ARRAY(a)", str(expr1.bitmap_to_array()))
         self.assertEqual("BITMAP_TO_BYTES(a)", str(expr1.bitmap_to_bytes()))
         self.assertEqual("BITMAP_TO_STRING(a)", str(expr1.bitmap_to_string()))
         self.assertEqual("BITMAP_XOR(a, b)", str(expr1.bitmap_xor(expr2)))
         self.assertEqual("BITMAP_XOR_AGG(a)", str(expr1.bitmap_xor_agg()))
+        self.assertEqual("BITMAP_XOR_CARDINALITY_AGG(a)", 
str(expr1.bitmap_xor_cardinality_agg()))
 
     def test_expressions(self):
         expr1 = col('a')
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 90735d6f7fc..94fbd7c101d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -79,17 +79,21 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_ANDNOT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND_AGG;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND_CARDINALITY_AGG;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD_AGG;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD_CARDINALITY_AGG;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_CARDINALITY;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_FROM_BYTES;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR_AGG;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR_CARDINALITY_AGG;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_ARRAY;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_BYTES;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_STRING;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR_AGG;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR_CARDINALITY_AGG;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BTRIM;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.CARDINALITY;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
@@ -2656,6 +2660,18 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(unresolvedCall(BITMAP_AND_AGG, 
toExpr()));
     }
 
+    /**
+     * Aggregates the AND (intersection) of multiple bitmaps and returns its 
64-bit cardinality.
+     *
+     * <p>NOTE: The retraction variant of this function may have significant 
performance overhead
+     * with large bitmaps.
+     *
+     * @return a BIGINT expression
+     */
+    public OutType bitmapAndCardinalityAgg() {
+        return 
toApiSpecificExpression(unresolvedCall(BITMAP_AND_CARDINALITY_AGG, toExpr()));
+    }
+
     /**
      * Creates a bitmap from an array of 32-bit integers.
      *
@@ -2676,6 +2692,15 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD_AGG, 
toExpr()));
     }
 
+    /**
+     * Aggregates 32-bit integers into a bitmap and returns its 64-bit 
cardinality.
+     *
+     * @return a BIGINT expression
+     */
+    public OutType bitmapBuildCardinalityAgg() {
+        return 
toApiSpecificExpression(unresolvedCall(BITMAP_BUILD_CARDINALITY_AGG, toExpr()));
+    }
+
     /**
      * Returns the cardinality of a bitmap.
      *
@@ -2727,6 +2752,18 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(unresolvedCall(BITMAP_OR_AGG, 
toExpr()));
     }
 
+    /**
+     * Aggregates the OR (union) of multiple bitmaps and returns its 64-bit 
cardinality.
+     *
+     * <p>NOTE: The retraction variant of this function may have significant 
performance overhead
+     * with large bitmaps.
+     *
+     * @return a BIGINT expression
+     */
+    public OutType bitmapOrCardinalityAgg() {
+        return 
toApiSpecificExpression(unresolvedCall(BITMAP_OR_CARDINALITY_AGG, toExpr()));
+    }
+
     /**
      * Converts a bitmap to an array of 32-bit integers, the values are sorted 
by {@link
      * Integer#compareUnsigned}.
@@ -2795,4 +2832,14 @@ public abstract class BaseExpressions<InType, OutType> {
     public OutType bitmapXorAgg() {
         return toApiSpecificExpression(unresolvedCall(BITMAP_XOR_AGG, 
toExpr()));
     }
+
+    /**
+     * Aggregates the XOR (symmetric difference) of multiple bitmaps and 
returns its 64-bit
+     * cardinality.
+     *
+     * @return a BIGINT expression
+     */
+    public OutType bitmapXorCardinalityAgg() {
+        return 
toApiSpecificExpression(unresolvedCall(BITMAP_XOR_CARDINALITY_AGG, toExpr()));
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index f20f871baf0..e8783ba51c8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -3027,6 +3027,18 @@ public final class BuiltInFunctionDefinitions {
                     .runtimeProvided()
                     .build();
 
+    public static final BuiltInFunctionDefinition BITMAP_AND_CARDINALITY_AGG =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("BITMAP_AND_CARDINALITY_AGG")
+                    .kind(AGGREGATE)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Collections.singletonList("bitmap"),
+                                    
Collections.singletonList(logical(LogicalTypeRoot.BITMAP))))
+                    .outputTypeStrategy(explicit(DataTypes.BIGINT()))
+                    .runtimeProvided()
+                    .build();
+
     public static final BuiltInFunctionDefinition BITMAP_BUILD =
             BuiltInFunctionDefinition.newBuilder()
                     .name("BITMAP_BUILD")
@@ -3061,6 +3073,18 @@ public final class BuiltInFunctionDefinitions {
                     .runtimeProvided()
                     .build();
 
+    public static final BuiltInFunctionDefinition BITMAP_BUILD_CARDINALITY_AGG 
=
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("BITMAP_BUILD_CARDINALITY_AGG")
+                    .kind(AGGREGATE)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Collections.singletonList("value"),
+                                    
Collections.singletonList(logical(LogicalTypeRoot.INTEGER))))
+                    .outputTypeStrategy(explicit(DataTypes.BIGINT()))
+                    .runtimeProvided()
+                    .build();
+
     public static final BuiltInFunctionDefinition BITMAP_CARDINALITY =
             BuiltInFunctionDefinition.newBuilder()
                     .name("BITMAP_CARDINALITY")
@@ -3115,6 +3139,18 @@ public final class BuiltInFunctionDefinitions {
                     .runtimeProvided()
                     .build();
 
+    public static final BuiltInFunctionDefinition BITMAP_OR_CARDINALITY_AGG =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("BITMAP_OR_CARDINALITY_AGG")
+                    .kind(AGGREGATE)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Collections.singletonList("bitmap"),
+                                    
Collections.singletonList(logical(LogicalTypeRoot.BITMAP))))
+                    .outputTypeStrategy(explicit(DataTypes.BIGINT()))
+                    .runtimeProvided()
+                    .build();
+
     public static final BuiltInFunctionDefinition BITMAP_TO_ARRAY =
             BuiltInFunctionDefinition.newBuilder()
                     .name("BITMAP_TO_ARRAY")
@@ -3181,6 +3217,18 @@ public final class BuiltInFunctionDefinitions {
                     .runtimeProvided()
                     .build();
 
+    public static final BuiltInFunctionDefinition BITMAP_XOR_CARDINALITY_AGG =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("BITMAP_XOR_CARDINALITY_AGG")
+                    .kind(AGGREGATE)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Collections.singletonList("bitmap"),
+                                    
Collections.singletonList(logical(LogicalTypeRoot.BITMAP))))
+                    .outputTypeStrategy(explicit(DataTypes.BIGINT()))
+                    .runtimeProvided()
+                    .build();
+
     // 
--------------------------------------------------------------------------------------------
     // Other functions
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 20bedac6c13..37ba914bacd 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -176,12 +176,20 @@ class AggFunctionFactory(
             createPercentileAggFunction(argTypes)
           case BuiltInFunctionDefinitions.BITMAP_BUILD_AGG =>
             createBitmapBuildAggFunction(argTypes, index)
+          case BuiltInFunctionDefinitions.BITMAP_BUILD_CARDINALITY_AGG =>
+            createBitmapBuildCardinalityAggFunction(argTypes, index)
           case BuiltInFunctionDefinitions.BITMAP_AND_AGG =>
             createBitmapAndAggFunction(argTypes, index)
+          case BuiltInFunctionDefinitions.BITMAP_AND_CARDINALITY_AGG =>
+            createBitmapAndCardinalityAggFunction(argTypes, index)
           case BuiltInFunctionDefinitions.BITMAP_OR_AGG =>
             createBitmapOrAggFunction(argTypes, index)
+          case BuiltInFunctionDefinitions.BITMAP_OR_CARDINALITY_AGG =>
+            createBitmapOrCardinalityAggFunction(argTypes, index)
           case BuiltInFunctionDefinitions.BITMAP_XOR_AGG =>
             createBitmapXorAggFunction(argTypes, index)
+          case BuiltInFunctionDefinitions.BITMAP_XOR_CARDINALITY_AGG =>
+            createBitmapXorCardinalityAggFunction(argTypes, index)
           // DeclarativeAggregateFunction & UDF
           case _ =>
             bridge.getDefinition.asInstanceOf[UserDefinedFunction]
@@ -662,9 +670,20 @@ class AggFunctionFactory(
       argTypes: Array[LogicalType],
       index: Int): UserDefinedFunction = {
     if (aggCallNeedRetractions(index)) {
-      new BitmapBuildWithRetractAggFunction(argTypes(0))
+      new 
AbstractBitmapBuildWithRetractAggFunction.BitmapBuildWithRetractAggFunction(argTypes(0))
     } else {
-      new BitmapBuildAggFunction(argTypes(0))
+      new AbstractBitmapBuildAggFunction.BitmapBuildAggFunction(argTypes(0))
+    }
+  }
+
+  private def createBitmapBuildCardinalityAggFunction(
+      argTypes: Array[LogicalType],
+      index: Int): UserDefinedFunction = {
+    if (aggCallNeedRetractions(index)) {
+      new 
AbstractBitmapBuildWithRetractAggFunction.BitmapBuildCardinalityWithRetractAggFunction(
+        argTypes(0))
+    } else {
+      new 
AbstractBitmapBuildAggFunction.BitmapBuildCardinalityAggFunction(argTypes(0))
     }
   }
 
@@ -672,9 +691,20 @@ class AggFunctionFactory(
       argTypes: Array[LogicalType],
       index: Int): UserDefinedFunction = {
     if (aggCallNeedRetractions(index)) {
-      new BitmapAndWithRetractAggFunction(argTypes(0))
+      new 
AbstractBitmapAndWithRetractAggFunction.BitmapAndWithRetractAggFunction(argTypes(0))
+    } else {
+      new AbstractBitmapAndAggFunction.BitmapAndAggFunction(argTypes(0))
+    }
+  }
+
+  private def createBitmapAndCardinalityAggFunction(
+      argTypes: Array[LogicalType],
+      index: Int): UserDefinedFunction = {
+    if (aggCallNeedRetractions(index)) {
+      new 
AbstractBitmapAndWithRetractAggFunction.BitmapAndCardinalityWithRetractAggFunction(
+        argTypes(0))
     } else {
-      new BitmapAndAggFunction(argTypes(0))
+      new 
AbstractBitmapAndAggFunction.BitmapAndCardinalityAggFunction(argTypes(0))
     }
   }
 
@@ -682,9 +712,20 @@ class AggFunctionFactory(
       argTypes: Array[LogicalType],
       index: Int): UserDefinedFunction = {
     if (aggCallNeedRetractions(index)) {
-      new BitmapOrWithRetractAggFunction(argTypes(0))
+      new 
AbstractBitmapOrWithRetractAggFunction.BitmapOrWithRetractAggFunction(argTypes(0))
     } else {
-      new BitmapOrAggFunction(argTypes(0))
+      new AbstractBitmapOrAggFunction.BitmapOrAggFunction(argTypes(0))
+    }
+  }
+
+  private def createBitmapOrCardinalityAggFunction(
+      argTypes: Array[LogicalType],
+      index: Int): UserDefinedFunction = {
+    if (aggCallNeedRetractions(index)) {
+      new 
AbstractBitmapOrWithRetractAggFunction.BitmapOrCardinalityWithRetractAggFunction(
+        argTypes(0))
+    } else {
+      new 
AbstractBitmapOrAggFunction.BitmapOrCardinalityAggFunction(argTypes(0))
     }
   }
 
@@ -692,9 +733,20 @@ class AggFunctionFactory(
       argTypes: Array[LogicalType],
       index: Int): UserDefinedFunction = {
     if (aggCallNeedRetractions(index)) {
-      new BitmapXorWithRetractAggFunction(argTypes(0))
+      new 
AbstractBitmapXorWithRetractAggFunction.BitmapXorWithRetractAggFunction(argTypes(0))
+    } else {
+      new AbstractBitmapXorAggFunction.BitmapXorAggFunction(argTypes(0))
+    }
+  }
+
+  private def createBitmapXorCardinalityAggFunction(
+      argTypes: Array[LogicalType],
+      index: Int): UserDefinedFunction = {
+    if (aggCallNeedRetractions(index)) {
+      new 
AbstractBitmapXorWithRetractAggFunction.BitmapXorCardinalityWithRetractAggFunction(
+        argTypes(0))
     } else {
-      new BitmapXorAggFunction(argTypes(0))
+      new 
AbstractBitmapXorAggFunction.BitmapXorCardinalityAggFunction(argTypes(0))
     }
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java
index cbd79bfea2a..2d67062b155 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java
@@ -44,6 +44,15 @@ import static org.apache.flink.types.RowKind.UPDATE_BEFORE;
 /** Tests for built-in bitmap aggregation functions. */
 class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase {
 
+    private static final Bitmap OVERSIZE_BITMAP;
+
+    static {
+        // size 0x80000000L
+        OVERSIZE_BITMAP = Bitmap.empty();
+        OVERSIZE_BITMAP.add(0L, Integer.MAX_VALUE);
+        OVERSIZE_BITMAP.add(Integer.MAX_VALUE);
+    }
+
     @Override
     Stream<TestSpec> getTestCaseSpecs() {
         final List<TestSpec> specs = new ArrayList<>();
@@ -71,23 +80,26 @@ class BitmapAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(INSERT, fromArray(-1, 0, 
1), "C"),
                                         Row.ofKind(INSERT, fromArray(-1, -2), 
"C"),
                                         Row.ofKind(INSERT, null, "C"),
-                                        Row.ofKind(INSERT, null, "D")))
+                                        Row.ofKind(INSERT, null, "D"),
+                                        Row.ofKind(INSERT, OVERSIZE_BITMAP, 
"E")))
                         .testResult(
                                 source ->
-                                        "SELECT f1, BITMAP_AND_AGG(f0) FROM "
+                                        "SELECT f1, BITMAP_AND_AGG(f0), 
BITMAP_AND_CARDINALITY_AGG(f0) FROM "
                                                 + source
                                                 + " GROUP BY f1",
                                 TableApiAggSpec.groupBySelect(
                                         Collections.singletonList($("f1")),
                                         $("f1"),
-                                        $("f0").bitmapAndAgg()),
-                                ROW(STRING(), BITMAP()),
-                                ROW(STRING(), BITMAP()),
+                                        $("f0").bitmapAndAgg(),
+                                        $("f0").bitmapAndCardinalityAgg()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
                                 Arrays.asList(
-                                        Row.of("A", fromArray(3)),
-                                        Row.of("B", fromArray(4, 6)),
-                                        Row.of("C", fromArray(-1)),
-                                        Row.of("D", null))),
+                                        Row.of("A", fromArray(3), 1L),
+                                        Row.of("B", fromArray(4, 6), 2L),
+                                        Row.of("C", fromArray(-1), 1L),
+                                        Row.of("D", null, null),
+                                        Row.of("E", OVERSIZE_BITMAP, 
0x80000000L))),
                 TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG)
                         .withDescription("with retraction")
                         .withSource(
@@ -119,19 +131,20 @@ class BitmapAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(UPDATE_AFTER, fromArray(1, 
2), "C")))
                         .testResult(
                                 source ->
-                                        "SELECT f1, BITMAP_AND_AGG(f0) FROM "
+                                        "SELECT f1, BITMAP_AND_AGG(f0), 
BITMAP_AND_CARDINALITY_AGG(f0) FROM "
                                                 + source
                                                 + " GROUP BY f1",
                                 TableApiAggSpec.groupBySelect(
                                         Collections.singletonList($("f1")),
                                         $("f1"),
-                                        $("f0").bitmapAndAgg()),
-                                ROW(STRING(), BITMAP()),
-                                ROW(STRING(), BITMAP()),
+                                        $("f0").bitmapAndAgg(),
+                                        $("f0").bitmapAndCardinalityAgg()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
                                 Arrays.asList(
-                                        Row.of("A", null),
-                                        Row.of("B", fromArray(3, 4)),
-                                        Row.of("C", fromArray(1, 2)))),
+                                        Row.of("A", null, null),
+                                        Row.of("B", fromArray(3, 4), 2L),
+                                        Row.of("C", fromArray(1, 2), 2L))),
                 TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG)
                         .withDescription("Validation Error")
                         .withSource(
@@ -170,19 +183,20 @@ class BitmapAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(INSERT, null, "C")))
                         .testResult(
                                 source ->
-                                        "SELECT f1, BITMAP_BUILD_AGG(f0) FROM "
+                                        "SELECT f1, BITMAP_BUILD_AGG(f0), 
BITMAP_BUILD_CARDINALITY_AGG(f0) FROM "
                                                 + source
                                                 + " GROUP BY f1",
                                 TableApiAggSpec.groupBySelect(
                                         Collections.singletonList($("f1")),
                                         $("f1"),
-                                        $("f0").bitmapBuildAgg()),
-                                ROW(STRING(), BITMAP()),
-                                ROW(STRING(), BITMAP()),
+                                        $("f0").bitmapBuildAgg(),
+                                        $("f0").bitmapBuildCardinalityAgg()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
                                 Arrays.asList(
-                                        Row.of("A", fromArray(-1, 1, 2, 3, 4)),
-                                        Row.of("B", fromArray(-1, 1, 2)),
-                                        Row.of("C", null))),
+                                        Row.of("A", fromArray(-1, 1, 2, 3, 4), 
5L),
+                                        Row.of("B", fromArray(-1, 1, 2), 3L),
+                                        Row.of("C", null, null))),
                 
TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG)
                         .withDescription("with retraction")
                         .withSource(
@@ -214,19 +228,20 @@ class BitmapAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(UPDATE_AFTER, 1, "C")))
                         .testResult(
                                 source ->
-                                        "SELECT f1, BITMAP_BUILD_AGG(f0) FROM "
+                                        "SELECT f1, BITMAP_BUILD_AGG(f0), 
BITMAP_BUILD_CARDINALITY_AGG(f0) FROM "
                                                 + source
                                                 + " GROUP BY f1",
                                 TableApiAggSpec.groupBySelect(
                                         Collections.singletonList($("f1")),
                                         $("f1"),
-                                        $("f0").bitmapBuildAgg()),
-                                ROW(STRING(), BITMAP()),
-                                ROW(STRING(), BITMAP()),
+                                        $("f0").bitmapBuildAgg(),
+                                        $("f0").bitmapBuildCardinalityAgg()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
                                 Arrays.asList(
-                                        Row.of("A", null),
-                                        Row.of("B", fromArray(-1, 1, 2)),
-                                        Row.of("C", fromArray(1)))),
+                                        Row.of("A", null, null),
+                                        Row.of("B", fromArray(-1, 1, 2), 3L),
+                                        Row.of("C", fromArray(1), 1L))),
                 
TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG)
                         .withDescription("Validation Error")
                         .withSource(
@@ -262,26 +277,29 @@ class BitmapAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(INSERT, fromArray(-1, 0, 
1), "C"),
                                         Row.ofKind(INSERT, fromArray(-1, -2), 
"C"),
                                         Row.ofKind(INSERT, null, "C"),
-                                        Row.ofKind(INSERT, null, "D")))
+                                        Row.ofKind(INSERT, null, "D"),
+                                        Row.ofKind(INSERT, OVERSIZE_BITMAP, 
"E")))
                         .testResult(
                                 source ->
-                                        "SELECT f1, BITMAP_OR_AGG(f0) FROM "
+                                        "SELECT f1, BITMAP_OR_AGG(f0), 
BITMAP_OR_CARDINALITY_AGG(f0) FROM "
                                                 + source
                                                 + " GROUP BY f1",
                                 TableApiAggSpec.groupBySelect(
                                         Collections.singletonList($("f1")),
                                         $("f1"),
-                                        $("f0").bitmapOrAgg()),
-                                ROW(STRING(), BITMAP()),
-                                ROW(STRING(), BITMAP()),
+                                        $("f0").bitmapOrAgg(),
+                                        $("f0").bitmapOrCardinalityAgg()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
                                 Arrays.asList(
-                                        Row.of("A", fromArray(1, 2, 3, 4, 5)),
+                                        Row.of("A", fromArray(1, 2, 3, 4, 5), 
5L),
                                         Row.of(
                                                 "B",
-                                                Bitmap.fromArray(
-                                                        new int[] {1, 2, 4, 6, 
8, 12, 16})),
-                                        Row.of("C", fromArray(0, 1, -2, -1)),
-                                        Row.of("D", null))),
+                                                Bitmap.fromArray(new int[] {1, 
2, 4, 6, 8, 12, 16}),
+                                                7L),
+                                        Row.of("C", fromArray(0, 1, -2, -1), 
4L),
+                                        Row.of("D", null, null),
+                                        Row.of("E", OVERSIZE_BITMAP, 
0x80000000L))),
                 TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG)
                         .withDescription("with retraction")
                         .withSource(
@@ -313,19 +331,20 @@ class BitmapAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(UPDATE_AFTER, fromArray(1, 
2), "C")))
                         .testResult(
                                 source ->
-                                        "SELECT f1, BITMAP_OR_AGG(f0) FROM "
+                                        "SELECT f1, BITMAP_OR_AGG(f0), 
BITMAP_OR_CARDINALITY_AGG(f0) FROM "
                                                 + source
                                                 + " GROUP BY f1",
                                 TableApiAggSpec.groupBySelect(
                                         Collections.singletonList($("f1")),
                                         $("f1"),
-                                        $("f0").bitmapOrAgg()),
-                                ROW(STRING(), BITMAP()),
-                                ROW(STRING(), BITMAP()),
+                                        $("f0").bitmapOrAgg(),
+                                        $("f0").bitmapOrCardinalityAgg()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
                                 Arrays.asList(
-                                        Row.of("A", null),
-                                        Row.of("B", fromArray(0, 2, 3, 4, 5, 
6, -1)),
-                                        Row.of("C", fromArray(1, 2)))),
+                                        Row.of("A", null, null),
+                                        Row.of("B", fromArray(0, 2, 3, 4, 5, 
6, -1), 7L),
+                                        Row.of("C", fromArray(1, 2), 2L))),
                 TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG)
                         .withDescription("Validation Error")
                         .withSource(
@@ -361,23 +380,26 @@ class BitmapAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(INSERT, fromArray(-1, 0, 
1), "C"),
                                         Row.ofKind(INSERT, fromArray(-1, -2), 
"C"),
                                         Row.ofKind(INSERT, null, "C"),
-                                        Row.ofKind(INSERT, null, "D")))
+                                        Row.ofKind(INSERT, null, "D"),
+                                        Row.ofKind(INSERT, OVERSIZE_BITMAP, 
"E")))
                         .testResult(
                                 source ->
-                                        "SELECT f1, BITMAP_XOR_AGG(f0) FROM "
+                                        "SELECT f1, BITMAP_XOR_AGG(f0), 
BITMAP_XOR_CARDINALITY_AGG(f0) FROM "
                                                 + source
                                                 + " GROUP BY f1",
                                 TableApiAggSpec.groupBySelect(
                                         Collections.singletonList($("f1")),
                                         $("f1"),
-                                        $("f0").bitmapXorAgg()),
-                                ROW(STRING(), BITMAP()),
-                                ROW(STRING(), BITMAP()),
+                                        $("f0").bitmapXorAgg(),
+                                        $("f0").bitmapXorCardinalityAgg()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
                                 Arrays.asList(
-                                        Row.of("A", fromArray(3, 4, 5)),
-                                        Row.of("B", fromArray(1, 4, 6, 8, 12, 
16)),
-                                        Row.of("C", fromArray(0, 1, -2)),
-                                        Row.of("D", null))),
+                                        Row.of("A", fromArray(3, 4, 5), 3L),
+                                        Row.of("B", fromArray(1, 4, 6, 8, 12, 
16), 6L),
+                                        Row.of("C", fromArray(0, 1, -2), 3L),
+                                        Row.of("D", null, null),
+                                        Row.of("E", OVERSIZE_BITMAP, 
0x80000000L))),
                 TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG)
                         .withDescription("with retraction")
                         .withSource(
@@ -409,19 +431,20 @@ class BitmapAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(UPDATE_AFTER, fromArray(1, 
2), "C")))
                         .testResult(
                                 source ->
-                                        "SELECT f1, BITMAP_XOR_AGG(f0) FROM "
+                                        "SELECT f1, BITMAP_XOR_AGG(f0), 
BITMAP_XOR_CARDINALITY_AGG(f0) FROM "
                                                 + source
                                                 + " GROUP BY f1",
                                 TableApiAggSpec.groupBySelect(
                                         Collections.singletonList($("f1")),
                                         $("f1"),
-                                        $("f0").bitmapXorAgg()),
-                                ROW(STRING(), BITMAP()),
-                                ROW(STRING(), BITMAP()),
+                                        $("f0").bitmapXorAgg(),
+                                        $("f0").bitmapXorCardinalityAgg()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
+                                ROW(STRING(), BITMAP(), BIGINT()),
                                 Arrays.asList(
-                                        Row.of("A", null),
-                                        Row.of("B", fromArray(0, 3, 4, 6, -1)),
-                                        Row.of("C", fromArray(1, 2)))),
+                                        Row.of("A", null, null),
+                                        Row.of("B", fromArray(0, 3, 4, 6, -1), 
5L),
+                                        Row.of("C", fromArray(1, 2), 2L))),
                 TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG)
                         .withDescription("Validation Error")
                         .withSource(
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndAggFunction.java
similarity index 68%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndAggFunction.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndAggFunction.java
index 4c1ad8a33b5..66bfb70919e 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndAggFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndAggFunction.java
@@ -33,14 +33,14 @@ import java.util.Objects;
 
 import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
 
-/** Built-in BITMAP_AND_AGG aggregate function. */
+/** Abstract base class for BITMAP_AND_AGG and BITMAP_AND_CARDINALITY_AGG. */
 @Internal
-public final class BitmapAndAggFunction
-        extends BuiltInAggregateFunction<Bitmap, 
BitmapAndAggFunction.BitmapAndAccumulator> {
+public abstract class AbstractBitmapAndAggFunction<T>
+        extends BuiltInAggregateFunction<T, 
AbstractBitmapAndAggFunction.BitmapAndAccumulator> {
 
     private final transient DataType valueDataType;
 
-    public BitmapAndAggFunction(LogicalType valueType) {
+    public AbstractBitmapAndAggFunction(LogicalType valueType) {
         this.valueDataType = toInternalDataType(valueType);
     }
 
@@ -59,16 +59,11 @@ public final class BitmapAndAggFunction
                 BitmapAndAccumulator.class, DataTypes.FIELD("bitmap", 
DataTypes.BITMAP()));
     }
 
-    @Override
-    public DataType getOutputDataType() {
-        return DataTypes.BITMAP();
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Accumulator
     // 
--------------------------------------------------------------------------------------------
 
-    /** Accumulator for BITMAP_AND_AGG. */
+    /** Accumulator for BITMAP_AND_AGG and BITMAP_AND_CARDINALITY_AGG. */
     public static class BitmapAndAccumulator {
 
         public @Nullable RoaringBitmapData bitmap;
@@ -81,8 +76,7 @@ public final class BitmapAndAggFunction
             if (obj == null || getClass() != obj.getClass()) {
                 return false;
             }
-            BitmapAndAggFunction.BitmapAndAccumulator that =
-                    (BitmapAndAggFunction.BitmapAndAccumulator) obj;
+            BitmapAndAccumulator that = (BitmapAndAccumulator) obj;
             return Objects.equals(bitmap, that.bitmap);
         }
 
@@ -101,11 +95,6 @@ public final class BitmapAndAggFunction
         acc.bitmap = null;
     }
 
-    @Override
-    public Bitmap getValue(BitmapAndAccumulator acc) {
-        return Bitmap.from(acc.bitmap);
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Runtime
     // 
--------------------------------------------------------------------------------------------
@@ -133,4 +122,45 @@ public final class BitmapAndAggFunction
             }
         }
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Sub-classes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Built-in BITMAP_AND_AGG aggregate function that returns bitmap. */
+    public static final class BitmapAndAggFunction extends 
AbstractBitmapAndAggFunction<Bitmap> {
+
+        public BitmapAndAggFunction(LogicalType valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BITMAP();
+        }
+
+        @Override
+        public Bitmap getValue(BitmapAndAccumulator acc) {
+            return Bitmap.from(acc.bitmap);
+        }
+    }
+
+    /** Built-in BITMAP_AND_CARDINALITY_AGG aggregate function that returns 
cardinality. */
+    public static final class BitmapAndCardinalityAggFunction
+            extends AbstractBitmapAndAggFunction<Long> {
+
+        public BitmapAndCardinalityAggFunction(LogicalType valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BIGINT();
+        }
+
+        @Override
+        public Long getValue(BitmapAndAccumulator acc) {
+            return acc.bitmap == null ? null : acc.bitmap.getLongCardinality();
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndWithRetractAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndWithRetractAggFunction.java
similarity index 70%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndWithRetractAggFunction.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndWithRetractAggFunction.java
index dd77b3fb339..c2bc3606ec3 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndWithRetractAggFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndWithRetractAggFunction.java
@@ -36,15 +36,15 @@ import java.util.Objects;
 
 import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
 
-/** Built-in BITMAP_AND_AGG with retraction aggregate function. */
+/** Abstract base class for BITMAP_AND_AGG and BITMAP_AND_CARDINALITY_AGG with 
retraction. */
 @Internal
-public final class BitmapAndWithRetractAggFunction
+public abstract class AbstractBitmapAndWithRetractAggFunction<T>
         extends BuiltInAggregateFunction<
-                Bitmap, 
BitmapAndWithRetractAggFunction.BitmapAndWithRetractAccumulator> {
+                T, 
AbstractBitmapAndWithRetractAggFunction.BitmapAndWithRetractAccumulator> {
 
     private final transient DataType valueDataType;
 
-    public BitmapAndWithRetractAggFunction(LogicalType valueType) {
+    public AbstractBitmapAndWithRetractAggFunction(LogicalType valueType) {
         this.valueDataType = toInternalDataType(valueType);
     }
 
@@ -68,16 +68,11 @@ public final class BitmapAndWithRetractAggFunction
                                 DataTypes.INT().notNull(), 
DataTypes.INT().notNull())));
     }
 
-    @Override
-    public DataType getOutputDataType() {
-        return DataTypes.BITMAP();
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Accumulator
     // 
--------------------------------------------------------------------------------------------
 
-    /** Accumulator for BITMAP_AND_AGG with retraction. */
+    /** Accumulator for BITMAP_AND_AGG and BITMAP_AND_CARDINALITY_AGG with 
retraction. */
     public static class BitmapAndWithRetractAccumulator {
 
         public int bitmapCount = 0;
@@ -111,26 +106,6 @@ public final class BitmapAndWithRetractAggFunction
         acc.valueCount.clear();
     }
 
-    @Override
-    public Bitmap getValue(BitmapAndWithRetractAccumulator acc) {
-        if (acc.bitmapCount <= 0) {
-            return null;
-        }
-
-        RoaringBitmapData bitmap = RoaringBitmapData.empty();
-        try {
-            for (Map.Entry<Integer, Integer> entry : acc.valueCount.entries()) 
{
-                if (entry.getValue() == acc.bitmapCount) {
-                    bitmap.add(entry.getKey());
-                }
-            }
-        } catch (Exception e) {
-            throw new FlinkRuntimeException(e);
-        }
-
-        return bitmap;
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Runtime
     // 
--------------------------------------------------------------------------------------------
@@ -208,4 +183,79 @@ public final class BitmapAndWithRetractAggFunction
             }
         }
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Sub-classes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Built-in BITMAP_AND_AGG with retraction aggregate function that 
returns bitmap. */
+    public static final class BitmapAndWithRetractAggFunction
+            extends AbstractBitmapAndWithRetractAggFunction<Bitmap> {
+
+        public BitmapAndWithRetractAggFunction(LogicalType valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BITMAP();
+        }
+
+        @Override
+        public Bitmap getValue(BitmapAndWithRetractAccumulator acc) {
+            if (acc.bitmapCount <= 0) {
+                return null;
+            }
+
+            RoaringBitmapData bitmap = RoaringBitmapData.empty();
+            try {
+                for (Map.Entry<Integer, Integer> entry : 
acc.valueCount.entries()) {
+                    if (entry.getValue() == acc.bitmapCount) {
+                        bitmap.add(entry.getKey());
+                    }
+                }
+            } catch (Exception e) {
+                throw new FlinkRuntimeException(e);
+            }
+
+            return bitmap;
+        }
+    }
+
+    /**
+     * Built-in BITMAP_AND_CARDINALITY_AGG with retraction aggregate function 
that returns
+     * cardinality.
+     */
+    public static final class BitmapAndCardinalityWithRetractAggFunction
+            extends AbstractBitmapAndWithRetractAggFunction<Long> {
+
+        public BitmapAndCardinalityWithRetractAggFunction(LogicalType 
valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BIGINT();
+        }
+
+        @Override
+        public Long getValue(BitmapAndWithRetractAccumulator acc) {
+            if (acc.bitmapCount <= 0) {
+                return null;
+            }
+
+            long cardinality = 0L;
+            try {
+                for (Map.Entry<Integer, Integer> entry : 
acc.valueCount.entries()) {
+                    if (entry.getValue() == acc.bitmapCount) {
+                        cardinality++;
+                    }
+                }
+            } catch (Exception e) {
+                throw new FlinkRuntimeException(e);
+            }
+
+            return cardinality;
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildAggFunction.java
similarity index 63%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildAggFunction.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildAggFunction.java
index 4e0215019e8..3d966102cf5 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildAggFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildAggFunction.java
@@ -32,13 +32,14 @@ import java.util.List;
 
 import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
 
-/** Built-in BITMAP_BUILD_AGG aggregate function. */
+/** Abstract base class for BITMAP_BUILD_AGG and BITMAP_BUILD_CARDINALITY_AGG. 
*/
 @Internal
-public final class BitmapBuildAggFunction extends 
BuiltInAggregateFunction<Bitmap, Bitmap> {
+public abstract class AbstractBitmapBuildAggFunction<T>
+        extends BuiltInAggregateFunction<T, Bitmap> {
 
     private final transient DataType valueDataType;
 
-    public BitmapBuildAggFunction(LogicalType valueType) {
+    public AbstractBitmapBuildAggFunction(LogicalType valueType) {
         this.valueDataType = toInternalDataType(valueType);
     }
 
@@ -56,11 +57,6 @@ public final class BitmapBuildAggFunction extends 
BuiltInAggregateFunction<Bitma
         return DataTypes.BITMAP().notNull();
     }
 
-    @Override
-    public DataType getOutputDataType() {
-        return DataTypes.BITMAP();
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Accumulator
     // 
--------------------------------------------------------------------------------------------
@@ -74,11 +70,6 @@ public final class BitmapBuildAggFunction extends 
BuiltInAggregateFunction<Bitma
         acc.clear();
     }
 
-    @Override
-    public Bitmap getValue(Bitmap acc) {
-        return acc.isEmpty() ? null : RoaringBitmapData.from(acc);
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Runtime
     // 
--------------------------------------------------------------------------------------------
@@ -94,4 +85,46 @@ public final class BitmapBuildAggFunction extends 
BuiltInAggregateFunction<Bitma
             acc.or(other);
         }
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Sub-classes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Built-in BITMAP_BUILD_AGG aggregate function that returns bitmap. */
+    public static final class BitmapBuildAggFunction
+            extends AbstractBitmapBuildAggFunction<Bitmap> {
+
+        public BitmapBuildAggFunction(LogicalType valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BITMAP();
+        }
+
+        @Override
+        public Bitmap getValue(Bitmap acc) {
+            return acc.isEmpty() ? null : RoaringBitmapData.from(acc);
+        }
+    }
+
+    /** Built-in BITMAP_BUILD_CARDINALITY_AGG aggregate function that returns 
cardinality. */
+    public static final class BitmapBuildCardinalityAggFunction
+            extends AbstractBitmapBuildAggFunction<Long> {
+
+        public BitmapBuildCardinalityAggFunction(LogicalType valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BIGINT();
+        }
+
+        @Override
+        public Long getValue(Bitmap acc) {
+            return acc.isEmpty() ? null : acc.getLongCardinality();
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildWithRetractAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildWithRetractAggFunction.java
similarity index 77%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildWithRetractAggFunction.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildWithRetractAggFunction.java
index c9599b7f31e..fb9c040e264 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildWithRetractAggFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildWithRetractAggFunction.java
@@ -35,15 +35,15 @@ import java.util.Objects;
 
 import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
 
-/** Built-in BITMAP_BUILD_AGG with retraction aggregate function. */
+/** Abstract base class for BITMAP_BUILD_AGG and BITMAP_BUILD_CARDINALITY_AGG 
with retraction. */
 @Internal
-public final class BitmapBuildWithRetractAggFunction
+public abstract class AbstractBitmapBuildWithRetractAggFunction<T>
         extends BuiltInAggregateFunction<
-                Bitmap, 
BitmapBuildWithRetractAggFunction.BitmapBuildWithRetractAccumulator> {
+                T, 
AbstractBitmapBuildWithRetractAggFunction.BitmapBuildWithRetractAccumulator> {
 
     private final transient DataType valueDataType;
 
-    public BitmapBuildWithRetractAggFunction(LogicalType valueType) {
+    public AbstractBitmapBuildWithRetractAggFunction(LogicalType valueType) {
         this.valueDataType = toInternalDataType(valueType);
     }
 
@@ -67,16 +67,11 @@ public final class BitmapBuildWithRetractAggFunction
                                 DataTypes.INT().notNull(), 
DataTypes.INT().notNull())));
     }
 
-    @Override
-    public DataType getOutputDataType() {
-        return DataTypes.BITMAP();
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Accumulator
     // 
--------------------------------------------------------------------------------------------
 
-    /** Accumulator for BITMAP_BUILD_AGG with retraction. */
+    /** Accumulator for BITMAP_BUILD_AGG and BITMAP_BUILD_CARDINALITY_AGG with 
retraction. */
     public static class BitmapBuildWithRetractAccumulator {
 
         // bitmap should reflect the actual data based on valueCount
@@ -112,11 +107,6 @@ public final class BitmapBuildWithRetractAggFunction
         acc.valueCount.clear();
     }
 
-    @Override
-    public Bitmap getValue(BitmapBuildWithRetractAccumulator acc) {
-        return acc.bitmap.isEmpty() ? null : 
RoaringBitmapData.from(acc.bitmap);
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Runtime
     // 
--------------------------------------------------------------------------------------------
@@ -196,4 +186,49 @@ public final class BitmapBuildWithRetractAggFunction
             }
         }
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Sub-classes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Built-in BITMAP_BUILD_AGG with retraction aggregate function that 
returns bitmap. */
+    public static final class BitmapBuildWithRetractAggFunction
+            extends AbstractBitmapBuildWithRetractAggFunction<Bitmap> {
+
+        public BitmapBuildWithRetractAggFunction(LogicalType valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BITMAP();
+        }
+
+        @Override
+        public Bitmap getValue(BitmapBuildWithRetractAccumulator acc) {
+            return acc.bitmap.isEmpty() ? null : 
RoaringBitmapData.from(acc.bitmap);
+        }
+    }
+
+    /**
+     * Built-in BITMAP_BUILD_CARDINALITY_AGG with retraction aggregate 
function that returns
+     * cardinality.
+     */
+    public static final class BitmapBuildCardinalityWithRetractAggFunction
+            extends AbstractBitmapBuildWithRetractAggFunction<Long> {
+
+        public BitmapBuildCardinalityWithRetractAggFunction(LogicalType 
valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BIGINT();
+        }
+
+        @Override
+        public Long getValue(BitmapBuildWithRetractAccumulator acc) {
+            return acc.bitmap.isEmpty() ? null : 
acc.bitmap.getLongCardinality();
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrAggFunction.java
similarity index 69%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrAggFunction.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrAggFunction.java
index 19bdd5db219..65c26029ea7 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrAggFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrAggFunction.java
@@ -33,14 +33,14 @@ import java.util.Objects;
 
 import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
 
-/** Built-in BITMAP_OR_AGG aggregate function. */
+/** Abstract base class for BITMAP_OR_AGG and BITMAP_OR_CARDINALITY_AGG. */
 @Internal
-public final class BitmapOrAggFunction
-        extends BuiltInAggregateFunction<Bitmap, 
BitmapOrAggFunction.BitmapOrAccumulator> {
+public abstract class AbstractBitmapOrAggFunction<T>
+        extends BuiltInAggregateFunction<T, 
AbstractBitmapOrAggFunction.BitmapOrAccumulator> {
 
     private final transient DataType valueDataType;
 
-    public BitmapOrAggFunction(LogicalType valueType) {
+    public AbstractBitmapOrAggFunction(LogicalType valueType) {
         this.valueDataType = toInternalDataType(valueType);
     }
 
@@ -59,16 +59,11 @@ public final class BitmapOrAggFunction
                 BitmapOrAccumulator.class, DataTypes.FIELD("bitmap", 
DataTypes.BITMAP()));
     }
 
-    @Override
-    public DataType getOutputDataType() {
-        return DataTypes.BITMAP();
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Accumulator
     // 
--------------------------------------------------------------------------------------------
 
-    /** Accumulator for BITMAP_OR_AGG. */
+    /** Accumulator for BITMAP_OR_AGG and BITMAP_OR_CARDINALITY_AGG. */
     public static class BitmapOrAccumulator {
 
         public @Nullable RoaringBitmapData bitmap;
@@ -81,8 +76,7 @@ public final class BitmapOrAggFunction
             if (obj == null || getClass() != obj.getClass()) {
                 return false;
             }
-            BitmapOrAggFunction.BitmapOrAccumulator that =
-                    (BitmapOrAggFunction.BitmapOrAccumulator) obj;
+            BitmapOrAccumulator that = (BitmapOrAccumulator) obj;
             return Objects.equals(bitmap, that.bitmap);
         }
 
@@ -101,11 +95,6 @@ public final class BitmapOrAggFunction
         acc.bitmap = null;
     }
 
-    @Override
-    public Bitmap getValue(BitmapOrAccumulator acc) {
-        return Bitmap.from(acc.bitmap);
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Runtime
     // 
--------------------------------------------------------------------------------------------
@@ -133,4 +122,45 @@ public final class BitmapOrAggFunction
             }
         }
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Sub-classes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Built-in BITMAP_OR_AGG aggregate function that returns bitmap. */
+    public static final class BitmapOrAggFunction extends 
AbstractBitmapOrAggFunction<Bitmap> {
+
+        public BitmapOrAggFunction(LogicalType valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BITMAP();
+        }
+
+        @Override
+        public Bitmap getValue(BitmapOrAccumulator acc) {
+            return Bitmap.from(acc.bitmap);
+        }
+    }
+
+    /** Built-in BITMAP_OR_CARDINALITY_AGG aggregate function that returns 
cardinality. */
+    public static final class BitmapOrCardinalityAggFunction
+            extends AbstractBitmapOrAggFunction<Long> {
+
+        public BitmapOrCardinalityAggFunction(LogicalType valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BIGINT();
+        }
+
+        @Override
+        public Long getValue(BitmapOrAccumulator acc) {
+            return acc.bitmap == null ? null : acc.bitmap.getLongCardinality();
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrWithRetractAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrWithRetractAggFunction.java
similarity index 81%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrWithRetractAggFunction.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrWithRetractAggFunction.java
index d4299c21511..af807c18e4e 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrWithRetractAggFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrWithRetractAggFunction.java
@@ -36,15 +36,15 @@ import java.util.Objects;
 
 import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
 
-/** Built-in BITMAP_OR_AGG with retraction aggregate function. */
+/** Abstract base class for BITMAP_OR_AGG and BITMAP_OR_CARDINALITY_AGG with 
retraction. */
 @Internal
-public final class BitmapOrWithRetractAggFunction
+public abstract class AbstractBitmapOrWithRetractAggFunction<T>
         extends BuiltInAggregateFunction<
-                Bitmap, 
BitmapOrWithRetractAggFunction.BitmapOrWithRetractAccumulator> {
+                T, 
AbstractBitmapOrWithRetractAggFunction.BitmapOrWithRetractAccumulator> {
 
     private final transient DataType valueDataType;
 
-    public BitmapOrWithRetractAggFunction(LogicalType valueType) {
+    public AbstractBitmapOrWithRetractAggFunction(LogicalType valueType) {
         this.valueDataType = toInternalDataType(valueType);
     }
 
@@ -69,16 +69,11 @@ public final class BitmapOrWithRetractAggFunction
                                 DataTypes.INT().notNull(), 
DataTypes.INT().notNull())));
     }
 
-    @Override
-    public DataType getOutputDataType() {
-        return DataTypes.BITMAP();
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Accumulator
     // 
--------------------------------------------------------------------------------------------
 
-    /** Accumulator for BITMAP_OR_AGG with retraction. */
+    /** Accumulator for BITMAP_OR_AGG and BITMAP_OR_CARDINALITY_AGG with 
retraction. */
     public static class BitmapOrWithRetractAccumulator {
 
         public int bitmapCount = 0;
@@ -117,11 +112,6 @@ public final class BitmapOrWithRetractAggFunction
         acc.valueCount.clear();
     }
 
-    @Override
-    public Bitmap getValue(BitmapOrWithRetractAccumulator acc) {
-        return acc.bitmapCount <= 0 ? null : 
RoaringBitmapData.from(acc.bitmap);
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Runtime
     // 
--------------------------------------------------------------------------------------------
@@ -226,4 +216,49 @@ public final class BitmapOrWithRetractAggFunction
             }
         }
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Sub-classes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Built-in BITMAP_OR_AGG with retraction aggregate function that returns 
bitmap. */
+    public static final class BitmapOrWithRetractAggFunction
+            extends AbstractBitmapOrWithRetractAggFunction<Bitmap> {
+
+        public BitmapOrWithRetractAggFunction(LogicalType valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BITMAP();
+        }
+
+        @Override
+        public Bitmap getValue(BitmapOrWithRetractAccumulator acc) {
+            return acc.bitmapCount <= 0 ? null : 
RoaringBitmapData.from(acc.bitmap);
+        }
+    }
+
+    /**
+     * Built-in BITMAP_OR_CARDINALITY_AGG with retraction aggregate function 
that returns
+     * cardinality.
+     */
+    public static final class BitmapOrCardinalityWithRetractAggFunction
+            extends AbstractBitmapOrWithRetractAggFunction<Long> {
+
+        public BitmapOrCardinalityWithRetractAggFunction(LogicalType 
valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BIGINT();
+        }
+
+        @Override
+        public Long getValue(BitmapOrWithRetractAccumulator acc) {
+            return acc.bitmapCount <= 0 ? null : 
acc.bitmap.getLongCardinality();
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorAggFunction.java
similarity index 68%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorAggFunction.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorAggFunction.java
index 766362f424d..3890d257192 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorAggFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorAggFunction.java
@@ -33,14 +33,14 @@ import java.util.Objects;
 
 import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
 
-/** Built-in BITMAP_XOR_AGG aggregate function. */
+/** Abstract base class for BITMAP_XOR_AGG and BITMAP_XOR_CARDINALITY_AGG. */
 @Internal
-public final class BitmapXorAggFunction
-        extends BuiltInAggregateFunction<Bitmap, 
BitmapXorAggFunction.BitmapXorAccumulator> {
+public abstract class AbstractBitmapXorAggFunction<T>
+        extends BuiltInAggregateFunction<T, 
AbstractBitmapXorAggFunction.BitmapXorAccumulator> {
 
     private final transient DataType valueDataType;
 
-    public BitmapXorAggFunction(LogicalType valueType) {
+    public AbstractBitmapXorAggFunction(LogicalType valueType) {
         this.valueDataType = toInternalDataType(valueType);
     }
 
@@ -59,16 +59,11 @@ public final class BitmapXorAggFunction
                 BitmapXorAccumulator.class, DataTypes.FIELD("bitmap", 
DataTypes.BITMAP()));
     }
 
-    @Override
-    public DataType getOutputDataType() {
-        return DataTypes.BITMAP();
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Accumulator
     // 
--------------------------------------------------------------------------------------------
 
-    /** Accumulator for BITMAP_XOR_AGG. */
+    /** Accumulator for BITMAP_XOR_AGG and BITMAP_XOR_CARDINALITY_AGG. */
     public static class BitmapXorAccumulator {
 
         public @Nullable RoaringBitmapData bitmap;
@@ -81,8 +76,7 @@ public final class BitmapXorAggFunction
             if (obj == null || getClass() != obj.getClass()) {
                 return false;
             }
-            BitmapXorAggFunction.BitmapXorAccumulator that =
-                    (BitmapXorAggFunction.BitmapXorAccumulator) obj;
+            BitmapXorAccumulator that = (BitmapXorAccumulator) obj;
             return Objects.equals(bitmap, that.bitmap);
         }
 
@@ -101,11 +95,6 @@ public final class BitmapXorAggFunction
         acc.bitmap = null;
     }
 
-    @Override
-    public Bitmap getValue(BitmapXorAccumulator acc) {
-        return Bitmap.from(acc.bitmap);
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Runtime
     // 
--------------------------------------------------------------------------------------------
@@ -133,4 +122,45 @@ public final class BitmapXorAggFunction
             }
         }
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Sub-classes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Built-in BITMAP_XOR_AGG aggregate function that returns bitmap. */
+    public static final class BitmapXorAggFunction extends 
AbstractBitmapXorAggFunction<Bitmap> {
+
+        public BitmapXorAggFunction(LogicalType valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BITMAP();
+        }
+
+        @Override
+        public Bitmap getValue(BitmapXorAccumulator acc) {
+            return Bitmap.from(acc.bitmap);
+        }
+    }
+
+    /** Built-in BITMAP_XOR_CARDINALITY_AGG aggregate function that returns 
cardinality. */
+    public static final class BitmapXorCardinalityAggFunction
+            extends AbstractBitmapXorAggFunction<Long> {
+
+        public BitmapXorCardinalityAggFunction(LogicalType valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BIGINT();
+        }
+
+        @Override
+        public Long getValue(BitmapXorAccumulator acc) {
+            return acc.bitmap == null ? null : acc.bitmap.getLongCardinality();
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorWithRetractAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorWithRetractAggFunction.java
similarity index 69%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorWithRetractAggFunction.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorWithRetractAggFunction.java
index d30e52beebb..fed8174dc34 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorWithRetractAggFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorWithRetractAggFunction.java
@@ -33,15 +33,15 @@ import java.util.Objects;
 
 import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
 
-/** Built-in BITMAP_XOR_AGG with retraction aggregate function. */
+/** Abstract base class for BITMAP_XOR_AGG and BITMAP_XOR_CARDINALITY_AGG with 
retraction. */
 @Internal
-public final class BitmapXorWithRetractAggFunction
+public abstract class AbstractBitmapXorWithRetractAggFunction<T>
         extends BuiltInAggregateFunction<
-                Bitmap, 
BitmapXorWithRetractAggFunction.BitmapXorWithRetractAccumulator> {
+                T, 
AbstractBitmapXorWithRetractAggFunction.BitmapXorWithRetractAccumulator> {
 
     private final transient DataType valueDataType;
 
-    public BitmapXorWithRetractAggFunction(LogicalType valueType) {
+    public AbstractBitmapXorWithRetractAggFunction(LogicalType valueType) {
         this.valueDataType = toInternalDataType(valueType);
     }
 
@@ -62,16 +62,11 @@ public final class BitmapXorWithRetractAggFunction
                 DataTypes.FIELD("bitmap", DataTypes.BITMAP().notNull()));
     }
 
-    @Override
-    public DataType getOutputDataType() {
-        return DataTypes.BITMAP();
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Accumulator
     // 
--------------------------------------------------------------------------------------------
 
-    /** Accumulator for BITMAP_XOR_AGG with retraction. */
+    /** Accumulator for BITMAP_XOR_AGG and BITMAP_XOR_CARDINALITY_AGG with 
retraction. */
     public static class BitmapXorWithRetractAccumulator {
 
         public int bitmapCount = 0;
@@ -105,11 +100,6 @@ public final class BitmapXorWithRetractAggFunction
         acc.bitmap.clear();
     }
 
-    @Override
-    public Bitmap getValue(BitmapXorWithRetractAccumulator acc) {
-        return acc.bitmapCount <= 0 ? null : 
RoaringBitmapData.from(acc.bitmap);
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Runtime
     // 
--------------------------------------------------------------------------------------------
@@ -139,4 +129,49 @@ public final class BitmapXorWithRetractAggFunction
             acc.bitmap.xor(other.bitmap);
         }
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Sub-classes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Built-in BITMAP_XOR_AGG with retraction aggregate function that 
returns bitmap. */
+    public static final class BitmapXorWithRetractAggFunction
+            extends AbstractBitmapXorWithRetractAggFunction<Bitmap> {
+
+        public BitmapXorWithRetractAggFunction(LogicalType valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BITMAP();
+        }
+
+        @Override
+        public Bitmap getValue(BitmapXorWithRetractAccumulator acc) {
+            return acc.bitmapCount <= 0 ? null : 
RoaringBitmapData.from(acc.bitmap);
+        }
+    }
+
+    /**
+     * Built-in BITMAP_XOR_CARDINALITY_AGG with retraction aggregate function 
that returns
+     * cardinality.
+     */
+    public static final class BitmapXorCardinalityWithRetractAggFunction
+            extends AbstractBitmapXorWithRetractAggFunction<Long> {
+
+        public BitmapXorCardinalityWithRetractAggFunction(LogicalType 
valueType) {
+            super(valueType);
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return DataTypes.BIGINT();
+        }
+
+        @Override
+        public Long getValue(BitmapXorWithRetractAccumulator acc) {
+            return acc.bitmapCount <= 0 ? null : 
acc.bitmap.getLongCardinality();
+        }
+    }
 }

Reply via email to