This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 62e7163f879389ba5205106f64fd5f6f99e2fb98 Author: dylanhz <[email protected]> AuthorDate: Tue Mar 24 02:04:46 2026 +0800 [FLINK-39187][table] Add new built-in functions to allow direct cardinality return for bitmap aggregation --- docs/data/sql_functions.yml | 36 +++++ docs/data/sql_functions_zh.yml | 44 +++++- .../docs/reference/pyflink.table/expressions.rst | 4 + flink-python/pyflink/table/expression.py | 39 ++++++ .../pyflink/table/tests/test_expression.py | 5 + .../flink/table/api/internal/BaseExpressions.java | 47 +++++++ .../functions/BuiltInFunctionDefinitions.java | 48 +++++++ .../planner/plan/utils/AggFunctionFactory.scala | 68 ++++++++-- .../planner/functions/BitmapAggFunctionITCase.java | 149 ++++++++++++--------- ...tion.java => AbstractBitmapAndAggFunction.java} | 64 ++++++--- ...> AbstractBitmapAndWithRetractAggFunction.java} | 110 ++++++++++----- ...on.java => AbstractBitmapBuildAggFunction.java} | 59 ++++++-- ...AbstractBitmapBuildWithRetractAggFunction.java} | 65 ++++++--- ...ction.java => AbstractBitmapOrAggFunction.java} | 64 ++++++--- ...=> AbstractBitmapOrWithRetractAggFunction.java} | 65 ++++++--- ...tion.java => AbstractBitmapXorAggFunction.java} | 64 ++++++--- ...> AbstractBitmapXorWithRetractAggFunction.java} | 65 ++++++--- 17 files changed, 782 insertions(+), 214 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index f1d6c30a418..7d7bc0768e4 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -1579,6 +1579,16 @@ bitmapagg: `bitmap BITMAP` Returns a `BITMAP`. + - sql: BITMAP_AND_CARDINALITY_AGG(bitmap) + table: bitmap.bitmapAndCardinalityAgg() + description: | + Aggregates the AND (intersection) of multiple bitmaps and returns its 64-bit cardinality. + + NOTE: The retraction variant of this function may have significant performance overhead with large bitmaps. + + `bitmap BITMAP` + + Returns a `BIGINT`. - sql: BITMAP_BUILD_AGG(value) table: value.bitmapBuildAgg() description: | @@ -1587,6 +1597,14 @@ bitmapagg: `value INT` Returns a `BITMAP`. + - sql: BITMAP_BUILD_CARDINALITY_AGG(value) + table: value.bitmapBuildCardinalityAgg() + description: | + Aggregates 32-bit integers into a bitmap and returns its 64-bit cardinality. + + `value INT` + + Returns a `BIGINT`. - sql: BITMAP_OR_AGG(bitmap) table: bitmap.bitmapOrAgg() description: | @@ -1597,6 +1615,16 @@ bitmapagg: `bitmap BITMAP` Returns a `BITMAP`. + - sql: BITMAP_OR_CARDINALITY_AGG(bitmap) + table: bitmap.bitmapOrCardinalityAgg() + description: | + Aggregates the OR (union) of multiple bitmaps and returns its 64-bit cardinality. + + NOTE: The retraction variant of this function may have significant performance overhead with large bitmaps. + + `bitmap BITMAP` + + Returns a `BIGINT`. - sql: BITMAP_XOR_AGG(bitmap) table: bitmap.bitmapXorAgg() description: | @@ -1605,6 +1633,14 @@ bitmapagg: `bitmap BITMAP` Returns a `BITMAP`. + - sql: BITMAP_XOR_CARDINALITY_AGG(bitmap) + table: bitmap.bitmapXorCardinalityAgg() + description: | + Aggregates the XOR (symmetric difference) of multiple bitmaps and returns its 64-bit cardinality. + + `bitmap BITMAP` + + Returns a `BIGINT`. catalog: - sql: CURRENT_DATABASE() diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index 8278e89677f..539e78168e1 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -1439,7 +1439,7 @@ bitmap: - sql: BITMAP_AND(bitmap1, bitmap2) table: bitmap1.bitmapAnd(bitmap2) description: | - 计算两个位图的交集 (AND)。 + 计算两个位图的交集(AND)。 `bitmap1 BITMAP, bitmap2 BITMAP` @@ -1447,7 +1447,7 @@ bitmap: - sql: BITMAP_ANDNOT(bitmap1, bitmap2) table: bitmap1.bitmapAndnot(bitmap2) description: | - 计算两个位图的差集 (AND NOT)。 + 计算两个位图的差集(AND NOT)。 `bitmap1 BITMAP, bitmap2 BITMAP` @@ -1481,7 +1481,7 @@ bitmap: - sql: BITMAP_OR(bitmap1, bitmap2) table: bitmap1.bitmapOr(bitmap2) description: | - 计算两个位图的并集 (OR)。 + 计算两个位图的并集(OR)。 `bitmap1 BITMAP, bitmap2 BITMAP` @@ -1521,7 +1521,7 @@ bitmap: - sql: BITMAP_XOR(bitmap1, bitmap2) table: bitmap1.bitmapXor(bitmap2) description: | - 计算两个位图的异或 (XOR)。 + 计算两个位图的异或(XOR)。 `bitmap1 BITMAP, bitmap2 BITMAP` @@ -1663,6 +1663,16 @@ bitmapagg: `bitmap BITMAP` 返回一个 `BITMAP`。 + - sql: BITMAP_AND_CARDINALITY_AGG(bitmap) + table: bitmap.bitmapAndCardinalityAgg() + description: | + 聚合多个位图的交集(AND)并返回其 64 位基数。 + + 注意:该函数的回撤版本在处理大位图时可能有显著的性能开销。 + + `bitmap BITMAP` + + 返回一个 `BIGINT`。 - sql: BITMAP_BUILD_AGG(value) table: value.bitmapBuildAgg() description: | @@ -1671,6 +1681,14 @@ bitmapagg: `value INT` 返回一个 `BITMAP`。 + - sql: BITMAP_BUILD_CARDINALITY_AGG(value) + table: value.bitmapBuildCardinalityAgg() + description: | + 将 32 位整数聚合成位图并返回其 64 位基数。 + + `value INT` + + 返回一个 `BIGINT`。 - sql: BITMAP_OR_AGG(bitmap) table: bitmap.bitmapOrAgg() description: | @@ -1681,6 +1699,16 @@ bitmapagg: `bitmap BITMAP` 返回一个 `BITMAP`。 + - sql: BITMAP_OR_CARDINALITY_AGG(bitmap) + table: bitmap.bitmapOrCardinalityAgg() + description: | + 聚合多个位图的并集(OR)并返回其 64 位基数。 + + 注意:该函数的回撤版本在处理大位图时可能有显著的性能开销。 + + `bitmap BITMAP` + + 返回一个 `BIGINT`。 - sql: BITMAP_XOR_AGG(bitmap) table: bitmap.bitmapXorAgg() description: | @@ -1689,6 +1717,14 @@ bitmapagg: `bitmap BITMAP` 返回一个 `BITMAP`。 + - sql: BITMAP_XOR_CARDINALITY_AGG(bitmap) + table: bitmap.bitmapXorCardinalityAgg() + description: | + 聚合多个位图的异或(XOR)并返回其 64 位基数。 + + `bitmap BITMAP` + + 返回一个 `BIGINT`。 catalog: - sql: CURRENT_DATABASE() diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index c54edf08ca2..e2aeb34f411 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -348,14 +348,18 @@ Bitmap functions Expression.bitmap_and Expression.bitmap_andnot Expression.bitmap_and_agg + Expression.bitmap_and_cardinality_agg Expression.bitmap_build Expression.bitmap_build_agg + Expression.bitmap_build_cardinality_agg Expression.bitmap_cardinality Expression.bitmap_from_bytes Expression.bitmap_or Expression.bitmap_or_agg + Expression.bitmap_or_cardinality_agg Expression.bitmap_to_array Expression.bitmap_to_bytes Expression.bitmap_to_string Expression.bitmap_xor Expression.bitmap_xor_agg + Expression.bitmap_xor_cardinality_agg diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index ff3628f0716..04649a64727 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -2310,6 +2310,17 @@ class Expression(Generic[T]): """ return _unary_op("bitmapAndAgg")(self) + def bitmap_and_cardinality_agg(self): + """ + Aggregates the AND (intersection) of multiple bitmaps and returns its 64-bit cardinality. + + NOTE: The retraction variant of this function may have significant performance overhead + with large bitmaps. + + :return: a BIGINT expression + """ + return _unary_op("bitmapAndCardinalityAgg")(self) + def bitmap_build(self) -> 'Expression': """ Creates a bitmap from an array of 32-bit integers. @@ -2328,6 +2339,14 @@ class Expression(Generic[T]): """ return _unary_op("bitmapBuildAgg")(self) + def bitmap_build_cardinality_agg(self): + """ + Aggregates 32-bit integers into a bitmap and returns its 64-bit cardinality. + + :return: a BIGINT expression + """ + return _unary_op("bitmapBuildCardinalityAgg")(self) + def bitmap_cardinality(self) -> 'Expression': """ Returns the cardinality of a bitmap. @@ -2373,6 +2392,17 @@ class Expression(Generic[T]): """ return _unary_op("bitmapOrAgg")(self) + def bitmap_or_cardinality_agg(self): + """ + Aggregates the OR (union) of multiple bitmaps and returns its 64-bit cardinality. + + NOTE: The retraction variant of this function may have significant performance overhead + with large bitmaps. + + :return: a BIGINT expression + """ + return _unary_op("bitmapOrCardinalityAgg")(self) + def bitmap_to_array(self) -> 'Expression': """ Converts a bitmap to an array of 32-bit integers, the values are sorted by \ @@ -2433,6 +2463,15 @@ class Expression(Generic[T]): """ return _unary_op("bitmapXorAgg")(self) + def bitmap_xor_cardinality_agg(self): + """ + Aggregates the XOR (symmetric difference) of multiple bitmaps and returns its 64-bit + cardinality. + + :return: a BIGINT expression + """ + return _unary_op("bitmapXorCardinalityAgg")(self) + # add the docs _make_math_log_doc() diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index 74329037bd0..e7a2078d1db 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -269,17 +269,22 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): self.assertEqual("BITMAP_AND(a, b)", str(expr1.bitmap_and(expr2))) self.assertEqual("BITMAP_ANDNOT(a, b)", str(expr1.bitmap_andnot(expr2))) self.assertEqual("BITMAP_AND_AGG(a)", str(expr1.bitmap_and_agg())) + self.assertEqual("BITMAP_AND_CARDINALITY_AGG(a)", str(expr1.bitmap_and_cardinality_agg())) self.assertEqual("BITMAP_BUILD(a)", str(expr1.bitmap_build())) self.assertEqual("BITMAP_BUILD_AGG(a)", str(expr1.bitmap_build_agg())) + self.assertEqual("BITMAP_BUILD_CARDINALITY_AGG(a)", + str(expr1.bitmap_build_cardinality_agg())) self.assertEqual("BITMAP_CARDINALITY(a)", str(expr1.bitmap_cardinality())) self.assertEqual("BITMAP_FROM_BYTES(a)", str(expr1.bitmap_from_bytes())) self.assertEqual("BITMAP_OR(a, b)", str(expr1.bitmap_or(expr2))) self.assertEqual("BITMAP_OR_AGG(a)", str(expr1.bitmap_or_agg())) + self.assertEqual("BITMAP_OR_CARDINALITY_AGG(a)", str(expr1.bitmap_or_cardinality_agg())) self.assertEqual("BITMAP_TO_ARRAY(a)", str(expr1.bitmap_to_array())) self.assertEqual("BITMAP_TO_BYTES(a)", str(expr1.bitmap_to_bytes())) self.assertEqual("BITMAP_TO_STRING(a)", str(expr1.bitmap_to_string())) self.assertEqual("BITMAP_XOR(a, b)", str(expr1.bitmap_xor(expr2))) self.assertEqual("BITMAP_XOR_AGG(a)", str(expr1.bitmap_xor_agg())) + self.assertEqual("BITMAP_XOR_CARDINALITY_AGG(a)", str(expr1.bitmap_xor_cardinality_agg())) def test_expressions(self): expr1 = col('a') diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 90735d6f7fc..94fbd7c101d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -79,17 +79,21 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_ANDNOT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND_AGG; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND_CARDINALITY_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD_AGG; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD_CARDINALITY_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_CARDINALITY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_FROM_BYTES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR_AGG; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR_CARDINALITY_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_ARRAY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_BYTES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_STRING; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR_AGG; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR_CARDINALITY_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BTRIM; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CARDINALITY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST; @@ -2656,6 +2660,18 @@ public abstract class BaseExpressions<InType, OutType> { return toApiSpecificExpression(unresolvedCall(BITMAP_AND_AGG, toExpr())); } + /** + * Aggregates the AND (intersection) of multiple bitmaps and returns its 64-bit cardinality. + * + * <p>NOTE: The retraction variant of this function may have significant performance overhead + * with large bitmaps. + * + * @return a BIGINT expression + */ + public OutType bitmapAndCardinalityAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_AND_CARDINALITY_AGG, toExpr())); + } + /** * Creates a bitmap from an array of 32-bit integers. * @@ -2676,6 +2692,15 @@ public abstract class BaseExpressions<InType, OutType> { return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD_AGG, toExpr())); } + /** + * Aggregates 32-bit integers into a bitmap and returns its 64-bit cardinality. + * + * @return a BIGINT expression + */ + public OutType bitmapBuildCardinalityAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD_CARDINALITY_AGG, toExpr())); + } + /** * Returns the cardinality of a bitmap. * @@ -2727,6 +2752,18 @@ public abstract class BaseExpressions<InType, OutType> { return toApiSpecificExpression(unresolvedCall(BITMAP_OR_AGG, toExpr())); } + /** + * Aggregates the OR (union) of multiple bitmaps and returns its 64-bit cardinality. + * + * <p>NOTE: The retraction variant of this function may have significant performance overhead + * with large bitmaps. + * + * @return a BIGINT expression + */ + public OutType bitmapOrCardinalityAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_OR_CARDINALITY_AGG, toExpr())); + } + /** * Converts a bitmap to an array of 32-bit integers, the values are sorted by {@link * Integer#compareUnsigned}. @@ -2795,4 +2832,14 @@ public abstract class BaseExpressions<InType, OutType> { public OutType bitmapXorAgg() { return toApiSpecificExpression(unresolvedCall(BITMAP_XOR_AGG, toExpr())); } + + /** + * Aggregates the XOR (symmetric difference) of multiple bitmaps and returns its 64-bit + * cardinality. + * + * @return a BIGINT expression + */ + public OutType bitmapXorCardinalityAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_XOR_CARDINALITY_AGG, toExpr())); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index f20f871baf0..e8783ba51c8 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -3027,6 +3027,18 @@ public final class BuiltInFunctionDefinitions { .runtimeProvided() .build(); + public static final BuiltInFunctionDefinition BITMAP_AND_CARDINALITY_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_AND_CARDINALITY_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(explicit(DataTypes.BIGINT())) + .runtimeProvided() + .build(); + public static final BuiltInFunctionDefinition BITMAP_BUILD = BuiltInFunctionDefinition.newBuilder() .name("BITMAP_BUILD") @@ -3061,6 +3073,18 @@ public final class BuiltInFunctionDefinitions { .runtimeProvided() .build(); + public static final BuiltInFunctionDefinition BITMAP_BUILD_CARDINALITY_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_BUILD_CARDINALITY_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("value"), + Collections.singletonList(logical(LogicalTypeRoot.INTEGER)))) + .outputTypeStrategy(explicit(DataTypes.BIGINT())) + .runtimeProvided() + .build(); + public static final BuiltInFunctionDefinition BITMAP_CARDINALITY = BuiltInFunctionDefinition.newBuilder() .name("BITMAP_CARDINALITY") @@ -3115,6 +3139,18 @@ public final class BuiltInFunctionDefinitions { .runtimeProvided() .build(); + public static final BuiltInFunctionDefinition BITMAP_OR_CARDINALITY_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_OR_CARDINALITY_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(explicit(DataTypes.BIGINT())) + .runtimeProvided() + .build(); + public static final BuiltInFunctionDefinition BITMAP_TO_ARRAY = BuiltInFunctionDefinition.newBuilder() .name("BITMAP_TO_ARRAY") @@ -3181,6 +3217,18 @@ public final class BuiltInFunctionDefinitions { .runtimeProvided() .build(); + public static final BuiltInFunctionDefinition BITMAP_XOR_CARDINALITY_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_XOR_CARDINALITY_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(explicit(DataTypes.BIGINT())) + .runtimeProvided() + .build(); + // -------------------------------------------------------------------------------------------- // Other functions // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala index 20bedac6c13..37ba914bacd 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala @@ -176,12 +176,20 @@ class AggFunctionFactory( createPercentileAggFunction(argTypes) case BuiltInFunctionDefinitions.BITMAP_BUILD_AGG => createBitmapBuildAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_BUILD_CARDINALITY_AGG => + createBitmapBuildCardinalityAggFunction(argTypes, index) case BuiltInFunctionDefinitions.BITMAP_AND_AGG => createBitmapAndAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_AND_CARDINALITY_AGG => + createBitmapAndCardinalityAggFunction(argTypes, index) case BuiltInFunctionDefinitions.BITMAP_OR_AGG => createBitmapOrAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_OR_CARDINALITY_AGG => + createBitmapOrCardinalityAggFunction(argTypes, index) case BuiltInFunctionDefinitions.BITMAP_XOR_AGG => createBitmapXorAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_XOR_CARDINALITY_AGG => + createBitmapXorCardinalityAggFunction(argTypes, index) // DeclarativeAggregateFunction & UDF case _ => bridge.getDefinition.asInstanceOf[UserDefinedFunction] @@ -662,9 +670,20 @@ class AggFunctionFactory( argTypes: Array[LogicalType], index: Int): UserDefinedFunction = { if (aggCallNeedRetractions(index)) { - new BitmapBuildWithRetractAggFunction(argTypes(0)) + new AbstractBitmapBuildWithRetractAggFunction.BitmapBuildWithRetractAggFunction(argTypes(0)) } else { - new BitmapBuildAggFunction(argTypes(0)) + new AbstractBitmapBuildAggFunction.BitmapBuildAggFunction(argTypes(0)) + } + } + + private def createBitmapBuildCardinalityAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new AbstractBitmapBuildWithRetractAggFunction.BitmapBuildCardinalityWithRetractAggFunction( + argTypes(0)) + } else { + new AbstractBitmapBuildAggFunction.BitmapBuildCardinalityAggFunction(argTypes(0)) } } @@ -672,9 +691,20 @@ class AggFunctionFactory( argTypes: Array[LogicalType], index: Int): UserDefinedFunction = { if (aggCallNeedRetractions(index)) { - new BitmapAndWithRetractAggFunction(argTypes(0)) + new AbstractBitmapAndWithRetractAggFunction.BitmapAndWithRetractAggFunction(argTypes(0)) + } else { + new AbstractBitmapAndAggFunction.BitmapAndAggFunction(argTypes(0)) + } + } + + private def createBitmapAndCardinalityAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new AbstractBitmapAndWithRetractAggFunction.BitmapAndCardinalityWithRetractAggFunction( + argTypes(0)) } else { - new BitmapAndAggFunction(argTypes(0)) + new AbstractBitmapAndAggFunction.BitmapAndCardinalityAggFunction(argTypes(0)) } } @@ -682,9 +712,20 @@ class AggFunctionFactory( argTypes: Array[LogicalType], index: Int): UserDefinedFunction = { if (aggCallNeedRetractions(index)) { - new BitmapOrWithRetractAggFunction(argTypes(0)) + new AbstractBitmapOrWithRetractAggFunction.BitmapOrWithRetractAggFunction(argTypes(0)) } else { - new BitmapOrAggFunction(argTypes(0)) + new AbstractBitmapOrAggFunction.BitmapOrAggFunction(argTypes(0)) + } + } + + private def createBitmapOrCardinalityAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new AbstractBitmapOrWithRetractAggFunction.BitmapOrCardinalityWithRetractAggFunction( + argTypes(0)) + } else { + new AbstractBitmapOrAggFunction.BitmapOrCardinalityAggFunction(argTypes(0)) } } @@ -692,9 +733,20 @@ class AggFunctionFactory( argTypes: Array[LogicalType], index: Int): UserDefinedFunction = { if (aggCallNeedRetractions(index)) { - new BitmapXorWithRetractAggFunction(argTypes(0)) + new AbstractBitmapXorWithRetractAggFunction.BitmapXorWithRetractAggFunction(argTypes(0)) + } else { + new AbstractBitmapXorAggFunction.BitmapXorAggFunction(argTypes(0)) + } + } + + private def createBitmapXorCardinalityAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new AbstractBitmapXorWithRetractAggFunction.BitmapXorCardinalityWithRetractAggFunction( + argTypes(0)) } else { - new BitmapXorAggFunction(argTypes(0)) + new AbstractBitmapXorAggFunction.BitmapXorCardinalityAggFunction(argTypes(0)) } } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java index cbd79bfea2a..2d67062b155 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java @@ -44,6 +44,15 @@ import static org.apache.flink.types.RowKind.UPDATE_BEFORE; /** Tests for built-in bitmap aggregation functions. */ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { + private static final Bitmap OVERSIZE_BITMAP; + + static { + // size 0x80000000L + OVERSIZE_BITMAP = Bitmap.empty(); + OVERSIZE_BITMAP.add(0L, Integer.MAX_VALUE); + OVERSIZE_BITMAP.add(Integer.MAX_VALUE); + } + @Override Stream<TestSpec> getTestCaseSpecs() { final List<TestSpec> specs = new ArrayList<>(); @@ -71,23 +80,26 @@ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { Row.ofKind(INSERT, fromArray(-1, 0, 1), "C"), Row.ofKind(INSERT, fromArray(-1, -2), "C"), Row.ofKind(INSERT, null, "C"), - Row.ofKind(INSERT, null, "D"))) + Row.ofKind(INSERT, null, "D"), + Row.ofKind(INSERT, OVERSIZE_BITMAP, "E"))) .testResult( source -> - "SELECT f1, BITMAP_AND_AGG(f0) FROM " + "SELECT f1, BITMAP_AND_AGG(f0), BITMAP_AND_CARDINALITY_AGG(f0) FROM " + source + " GROUP BY f1", TableApiAggSpec.groupBySelect( Collections.singletonList($("f1")), $("f1"), - $("f0").bitmapAndAgg()), - ROW(STRING(), BITMAP()), - ROW(STRING(), BITMAP()), + $("f0").bitmapAndAgg(), + $("f0").bitmapAndCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), Arrays.asList( - Row.of("A", fromArray(3)), - Row.of("B", fromArray(4, 6)), - Row.of("C", fromArray(-1)), - Row.of("D", null))), + Row.of("A", fromArray(3), 1L), + Row.of("B", fromArray(4, 6), 2L), + Row.of("C", fromArray(-1), 1L), + Row.of("D", null, null), + Row.of("E", OVERSIZE_BITMAP, 0x80000000L))), TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG) .withDescription("with retraction") .withSource( @@ -119,19 +131,20 @@ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { Row.ofKind(UPDATE_AFTER, fromArray(1, 2), "C"))) .testResult( source -> - "SELECT f1, BITMAP_AND_AGG(f0) FROM " + "SELECT f1, BITMAP_AND_AGG(f0), BITMAP_AND_CARDINALITY_AGG(f0) FROM " + source + " GROUP BY f1", TableApiAggSpec.groupBySelect( Collections.singletonList($("f1")), $("f1"), - $("f0").bitmapAndAgg()), - ROW(STRING(), BITMAP()), - ROW(STRING(), BITMAP()), + $("f0").bitmapAndAgg(), + $("f0").bitmapAndCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), Arrays.asList( - Row.of("A", null), - Row.of("B", fromArray(3, 4)), - Row.of("C", fromArray(1, 2)))), + Row.of("A", null, null), + Row.of("B", fromArray(3, 4), 2L), + Row.of("C", fromArray(1, 2), 2L))), TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG) .withDescription("Validation Error") .withSource( @@ -170,19 +183,20 @@ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { Row.ofKind(INSERT, null, "C"))) .testResult( source -> - "SELECT f1, BITMAP_BUILD_AGG(f0) FROM " + "SELECT f1, BITMAP_BUILD_AGG(f0), BITMAP_BUILD_CARDINALITY_AGG(f0) FROM " + source + " GROUP BY f1", TableApiAggSpec.groupBySelect( Collections.singletonList($("f1")), $("f1"), - $("f0").bitmapBuildAgg()), - ROW(STRING(), BITMAP()), - ROW(STRING(), BITMAP()), + $("f0").bitmapBuildAgg(), + $("f0").bitmapBuildCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), Arrays.asList( - Row.of("A", fromArray(-1, 1, 2, 3, 4)), - Row.of("B", fromArray(-1, 1, 2)), - Row.of("C", null))), + Row.of("A", fromArray(-1, 1, 2, 3, 4), 5L), + Row.of("B", fromArray(-1, 1, 2), 3L), + Row.of("C", null, null))), TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG) .withDescription("with retraction") .withSource( @@ -214,19 +228,20 @@ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { Row.ofKind(UPDATE_AFTER, 1, "C"))) .testResult( source -> - "SELECT f1, BITMAP_BUILD_AGG(f0) FROM " + "SELECT f1, BITMAP_BUILD_AGG(f0), BITMAP_BUILD_CARDINALITY_AGG(f0) FROM " + source + " GROUP BY f1", TableApiAggSpec.groupBySelect( Collections.singletonList($("f1")), $("f1"), - $("f0").bitmapBuildAgg()), - ROW(STRING(), BITMAP()), - ROW(STRING(), BITMAP()), + $("f0").bitmapBuildAgg(), + $("f0").bitmapBuildCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), Arrays.asList( - Row.of("A", null), - Row.of("B", fromArray(-1, 1, 2)), - Row.of("C", fromArray(1)))), + Row.of("A", null, null), + Row.of("B", fromArray(-1, 1, 2), 3L), + Row.of("C", fromArray(1), 1L))), TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG) .withDescription("Validation Error") .withSource( @@ -262,26 +277,29 @@ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { Row.ofKind(INSERT, fromArray(-1, 0, 1), "C"), Row.ofKind(INSERT, fromArray(-1, -2), "C"), Row.ofKind(INSERT, null, "C"), - Row.ofKind(INSERT, null, "D"))) + Row.ofKind(INSERT, null, "D"), + Row.ofKind(INSERT, OVERSIZE_BITMAP, "E"))) .testResult( source -> - "SELECT f1, BITMAP_OR_AGG(f0) FROM " + "SELECT f1, BITMAP_OR_AGG(f0), BITMAP_OR_CARDINALITY_AGG(f0) FROM " + source + " GROUP BY f1", TableApiAggSpec.groupBySelect( Collections.singletonList($("f1")), $("f1"), - $("f0").bitmapOrAgg()), - ROW(STRING(), BITMAP()), - ROW(STRING(), BITMAP()), + $("f0").bitmapOrAgg(), + $("f0").bitmapOrCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), Arrays.asList( - Row.of("A", fromArray(1, 2, 3, 4, 5)), + Row.of("A", fromArray(1, 2, 3, 4, 5), 5L), Row.of( "B", - Bitmap.fromArray( - new int[] {1, 2, 4, 6, 8, 12, 16})), - Row.of("C", fromArray(0, 1, -2, -1)), - Row.of("D", null))), + Bitmap.fromArray(new int[] {1, 2, 4, 6, 8, 12, 16}), + 7L), + Row.of("C", fromArray(0, 1, -2, -1), 4L), + Row.of("D", null, null), + Row.of("E", OVERSIZE_BITMAP, 0x80000000L))), TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG) .withDescription("with retraction") .withSource( @@ -313,19 +331,20 @@ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { Row.ofKind(UPDATE_AFTER, fromArray(1, 2), "C"))) .testResult( source -> - "SELECT f1, BITMAP_OR_AGG(f0) FROM " + "SELECT f1, BITMAP_OR_AGG(f0), BITMAP_OR_CARDINALITY_AGG(f0) FROM " + source + " GROUP BY f1", TableApiAggSpec.groupBySelect( Collections.singletonList($("f1")), $("f1"), - $("f0").bitmapOrAgg()), - ROW(STRING(), BITMAP()), - ROW(STRING(), BITMAP()), + $("f0").bitmapOrAgg(), + $("f0").bitmapOrCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), Arrays.asList( - Row.of("A", null), - Row.of("B", fromArray(0, 2, 3, 4, 5, 6, -1)), - Row.of("C", fromArray(1, 2)))), + Row.of("A", null, null), + Row.of("B", fromArray(0, 2, 3, 4, 5, 6, -1), 7L), + Row.of("C", fromArray(1, 2), 2L))), TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG) .withDescription("Validation Error") .withSource( @@ -361,23 +380,26 @@ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { Row.ofKind(INSERT, fromArray(-1, 0, 1), "C"), Row.ofKind(INSERT, fromArray(-1, -2), "C"), Row.ofKind(INSERT, null, "C"), - Row.ofKind(INSERT, null, "D"))) + Row.ofKind(INSERT, null, "D"), + Row.ofKind(INSERT, OVERSIZE_BITMAP, "E"))) .testResult( source -> - "SELECT f1, BITMAP_XOR_AGG(f0) FROM " + "SELECT f1, BITMAP_XOR_AGG(f0), BITMAP_XOR_CARDINALITY_AGG(f0) FROM " + source + " GROUP BY f1", TableApiAggSpec.groupBySelect( Collections.singletonList($("f1")), $("f1"), - $("f0").bitmapXorAgg()), - ROW(STRING(), BITMAP()), - ROW(STRING(), BITMAP()), + $("f0").bitmapXorAgg(), + $("f0").bitmapXorCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), Arrays.asList( - Row.of("A", fromArray(3, 4, 5)), - Row.of("B", fromArray(1, 4, 6, 8, 12, 16)), - Row.of("C", fromArray(0, 1, -2)), - Row.of("D", null))), + Row.of("A", fromArray(3, 4, 5), 3L), + Row.of("B", fromArray(1, 4, 6, 8, 12, 16), 6L), + Row.of("C", fromArray(0, 1, -2), 3L), + Row.of("D", null, null), + Row.of("E", OVERSIZE_BITMAP, 0x80000000L))), TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG) .withDescription("with retraction") .withSource( @@ -409,19 +431,20 @@ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { Row.ofKind(UPDATE_AFTER, fromArray(1, 2), "C"))) .testResult( source -> - "SELECT f1, BITMAP_XOR_AGG(f0) FROM " + "SELECT f1, BITMAP_XOR_AGG(f0), BITMAP_XOR_CARDINALITY_AGG(f0) FROM " + source + " GROUP BY f1", TableApiAggSpec.groupBySelect( Collections.singletonList($("f1")), $("f1"), - $("f0").bitmapXorAgg()), - ROW(STRING(), BITMAP()), - ROW(STRING(), BITMAP()), + $("f0").bitmapXorAgg(), + $("f0").bitmapXorCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), Arrays.asList( - Row.of("A", null), - Row.of("B", fromArray(0, 3, 4, 6, -1)), - Row.of("C", fromArray(1, 2)))), + Row.of("A", null, null), + Row.of("B", fromArray(0, 3, 4, 6, -1), 5L), + Row.of("C", fromArray(1, 2), 2L))), TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG) .withDescription("Validation Error") .withSource( diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndAggFunction.java similarity index 68% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndAggFunction.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndAggFunction.java index 4c1ad8a33b5..66bfb70919e 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndAggFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndAggFunction.java @@ -33,14 +33,14 @@ import java.util.Objects; import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; -/** Built-in BITMAP_AND_AGG aggregate function. */ +/** Abstract base class for BITMAP_AND_AGG and BITMAP_AND_CARDINALITY_AGG. */ @Internal -public final class BitmapAndAggFunction - extends BuiltInAggregateFunction<Bitmap, BitmapAndAggFunction.BitmapAndAccumulator> { +public abstract class AbstractBitmapAndAggFunction<T> + extends BuiltInAggregateFunction<T, AbstractBitmapAndAggFunction.BitmapAndAccumulator> { private final transient DataType valueDataType; - public BitmapAndAggFunction(LogicalType valueType) { + public AbstractBitmapAndAggFunction(LogicalType valueType) { this.valueDataType = toInternalDataType(valueType); } @@ -59,16 +59,11 @@ public final class BitmapAndAggFunction BitmapAndAccumulator.class, DataTypes.FIELD("bitmap", DataTypes.BITMAP())); } - @Override - public DataType getOutputDataType() { - return DataTypes.BITMAP(); - } - // -------------------------------------------------------------------------------------------- // Accumulator // -------------------------------------------------------------------------------------------- - /** Accumulator for BITMAP_AND_AGG. */ + /** Accumulator for BITMAP_AND_AGG and BITMAP_AND_CARDINALITY_AGG. */ public static class BitmapAndAccumulator { public @Nullable RoaringBitmapData bitmap; @@ -81,8 +76,7 @@ public final class BitmapAndAggFunction if (obj == null || getClass() != obj.getClass()) { return false; } - BitmapAndAggFunction.BitmapAndAccumulator that = - (BitmapAndAggFunction.BitmapAndAccumulator) obj; + BitmapAndAccumulator that = (BitmapAndAccumulator) obj; return Objects.equals(bitmap, that.bitmap); } @@ -101,11 +95,6 @@ public final class BitmapAndAggFunction acc.bitmap = null; } - @Override - public Bitmap getValue(BitmapAndAccumulator acc) { - return Bitmap.from(acc.bitmap); - } - // -------------------------------------------------------------------------------------------- // Runtime // -------------------------------------------------------------------------------------------- @@ -133,4 +122,45 @@ public final class BitmapAndAggFunction } } } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_AND_AGG aggregate function that returns bitmap. */ + public static final class BitmapAndAggFunction extends AbstractBitmapAndAggFunction<Bitmap> { + + public BitmapAndAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapAndAccumulator acc) { + return Bitmap.from(acc.bitmap); + } + } + + /** Built-in BITMAP_AND_CARDINALITY_AGG aggregate function that returns cardinality. */ + public static final class BitmapAndCardinalityAggFunction + extends AbstractBitmapAndAggFunction<Long> { + + public BitmapAndCardinalityAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapAndAccumulator acc) { + return acc.bitmap == null ? null : acc.bitmap.getLongCardinality(); + } + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndWithRetractAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndWithRetractAggFunction.java similarity index 70% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndWithRetractAggFunction.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndWithRetractAggFunction.java index dd77b3fb339..c2bc3606ec3 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndWithRetractAggFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndWithRetractAggFunction.java @@ -36,15 +36,15 @@ import java.util.Objects; import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; -/** Built-in BITMAP_AND_AGG with retraction aggregate function. */ +/** Abstract base class for BITMAP_AND_AGG and BITMAP_AND_CARDINALITY_AGG with retraction. */ @Internal -public final class BitmapAndWithRetractAggFunction +public abstract class AbstractBitmapAndWithRetractAggFunction<T> extends BuiltInAggregateFunction< - Bitmap, BitmapAndWithRetractAggFunction.BitmapAndWithRetractAccumulator> { + T, AbstractBitmapAndWithRetractAggFunction.BitmapAndWithRetractAccumulator> { private final transient DataType valueDataType; - public BitmapAndWithRetractAggFunction(LogicalType valueType) { + public AbstractBitmapAndWithRetractAggFunction(LogicalType valueType) { this.valueDataType = toInternalDataType(valueType); } @@ -68,16 +68,11 @@ public final class BitmapAndWithRetractAggFunction DataTypes.INT().notNull(), DataTypes.INT().notNull()))); } - @Override - public DataType getOutputDataType() { - return DataTypes.BITMAP(); - } - // -------------------------------------------------------------------------------------------- // Accumulator // -------------------------------------------------------------------------------------------- - /** Accumulator for BITMAP_AND_AGG with retraction. */ + /** Accumulator for BITMAP_AND_AGG and BITMAP_AND_CARDINALITY_AGG with retraction. */ public static class BitmapAndWithRetractAccumulator { public int bitmapCount = 0; @@ -111,26 +106,6 @@ public final class BitmapAndWithRetractAggFunction acc.valueCount.clear(); } - @Override - public Bitmap getValue(BitmapAndWithRetractAccumulator acc) { - if (acc.bitmapCount <= 0) { - return null; - } - - RoaringBitmapData bitmap = RoaringBitmapData.empty(); - try { - for (Map.Entry<Integer, Integer> entry : acc.valueCount.entries()) { - if (entry.getValue() == acc.bitmapCount) { - bitmap.add(entry.getKey()); - } - } - } catch (Exception e) { - throw new FlinkRuntimeException(e); - } - - return bitmap; - } - // -------------------------------------------------------------------------------------------- // Runtime // -------------------------------------------------------------------------------------------- @@ -208,4 +183,79 @@ public final class BitmapAndWithRetractAggFunction } } } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_AND_AGG with retraction aggregate function that returns bitmap. */ + public static final class BitmapAndWithRetractAggFunction + extends AbstractBitmapAndWithRetractAggFunction<Bitmap> { + + public BitmapAndWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapAndWithRetractAccumulator acc) { + if (acc.bitmapCount <= 0) { + return null; + } + + RoaringBitmapData bitmap = RoaringBitmapData.empty(); + try { + for (Map.Entry<Integer, Integer> entry : acc.valueCount.entries()) { + if (entry.getValue() == acc.bitmapCount) { + bitmap.add(entry.getKey()); + } + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + + return bitmap; + } + } + + /** + * Built-in BITMAP_AND_CARDINALITY_AGG with retraction aggregate function that returns + * cardinality. + */ + public static final class BitmapAndCardinalityWithRetractAggFunction + extends AbstractBitmapAndWithRetractAggFunction<Long> { + + public BitmapAndCardinalityWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapAndWithRetractAccumulator acc) { + if (acc.bitmapCount <= 0) { + return null; + } + + long cardinality = 0L; + try { + for (Map.Entry<Integer, Integer> entry : acc.valueCount.entries()) { + if (entry.getValue() == acc.bitmapCount) { + cardinality++; + } + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + + return cardinality; + } + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildAggFunction.java similarity index 63% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildAggFunction.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildAggFunction.java index 4e0215019e8..3d966102cf5 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildAggFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildAggFunction.java @@ -32,13 +32,14 @@ import java.util.List; import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; -/** Built-in BITMAP_BUILD_AGG aggregate function. */ +/** Abstract base class for BITMAP_BUILD_AGG and BITMAP_BUILD_CARDINALITY_AGG. */ @Internal -public final class BitmapBuildAggFunction extends BuiltInAggregateFunction<Bitmap, Bitmap> { +public abstract class AbstractBitmapBuildAggFunction<T> + extends BuiltInAggregateFunction<T, Bitmap> { private final transient DataType valueDataType; - public BitmapBuildAggFunction(LogicalType valueType) { + public AbstractBitmapBuildAggFunction(LogicalType valueType) { this.valueDataType = toInternalDataType(valueType); } @@ -56,11 +57,6 @@ public final class BitmapBuildAggFunction extends BuiltInAggregateFunction<Bitma return DataTypes.BITMAP().notNull(); } - @Override - public DataType getOutputDataType() { - return DataTypes.BITMAP(); - } - // -------------------------------------------------------------------------------------------- // Accumulator // -------------------------------------------------------------------------------------------- @@ -74,11 +70,6 @@ public final class BitmapBuildAggFunction extends BuiltInAggregateFunction<Bitma acc.clear(); } - @Override - public Bitmap getValue(Bitmap acc) { - return acc.isEmpty() ? null : RoaringBitmapData.from(acc); - } - // -------------------------------------------------------------------------------------------- // Runtime // -------------------------------------------------------------------------------------------- @@ -94,4 +85,46 @@ public final class BitmapBuildAggFunction extends BuiltInAggregateFunction<Bitma acc.or(other); } } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_BUILD_AGG aggregate function that returns bitmap. */ + public static final class BitmapBuildAggFunction + extends AbstractBitmapBuildAggFunction<Bitmap> { + + public BitmapBuildAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(Bitmap acc) { + return acc.isEmpty() ? null : RoaringBitmapData.from(acc); + } + } + + /** Built-in BITMAP_BUILD_CARDINALITY_AGG aggregate function that returns cardinality. */ + public static final class BitmapBuildCardinalityAggFunction + extends AbstractBitmapBuildAggFunction<Long> { + + public BitmapBuildCardinalityAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(Bitmap acc) { + return acc.isEmpty() ? null : acc.getLongCardinality(); + } + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildWithRetractAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildWithRetractAggFunction.java similarity index 77% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildWithRetractAggFunction.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildWithRetractAggFunction.java index c9599b7f31e..fb9c040e264 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildWithRetractAggFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildWithRetractAggFunction.java @@ -35,15 +35,15 @@ import java.util.Objects; import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; -/** Built-in BITMAP_BUILD_AGG with retraction aggregate function. */ +/** Abstract base class for BITMAP_BUILD_AGG and BITMAP_BUILD_CARDINALITY_AGG with retraction. */ @Internal -public final class BitmapBuildWithRetractAggFunction +public abstract class AbstractBitmapBuildWithRetractAggFunction<T> extends BuiltInAggregateFunction< - Bitmap, BitmapBuildWithRetractAggFunction.BitmapBuildWithRetractAccumulator> { + T, AbstractBitmapBuildWithRetractAggFunction.BitmapBuildWithRetractAccumulator> { private final transient DataType valueDataType; - public BitmapBuildWithRetractAggFunction(LogicalType valueType) { + public AbstractBitmapBuildWithRetractAggFunction(LogicalType valueType) { this.valueDataType = toInternalDataType(valueType); } @@ -67,16 +67,11 @@ public final class BitmapBuildWithRetractAggFunction DataTypes.INT().notNull(), DataTypes.INT().notNull()))); } - @Override - public DataType getOutputDataType() { - return DataTypes.BITMAP(); - } - // -------------------------------------------------------------------------------------------- // Accumulator // -------------------------------------------------------------------------------------------- - /** Accumulator for BITMAP_BUILD_AGG with retraction. */ + /** Accumulator for BITMAP_BUILD_AGG and BITMAP_BUILD_CARDINALITY_AGG with retraction. */ public static class BitmapBuildWithRetractAccumulator { // bitmap should reflect the actual data based on valueCount @@ -112,11 +107,6 @@ public final class BitmapBuildWithRetractAggFunction acc.valueCount.clear(); } - @Override - public Bitmap getValue(BitmapBuildWithRetractAccumulator acc) { - return acc.bitmap.isEmpty() ? null : RoaringBitmapData.from(acc.bitmap); - } - // -------------------------------------------------------------------------------------------- // Runtime // -------------------------------------------------------------------------------------------- @@ -196,4 +186,49 @@ public final class BitmapBuildWithRetractAggFunction } } } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_BUILD_AGG with retraction aggregate function that returns bitmap. */ + public static final class BitmapBuildWithRetractAggFunction + extends AbstractBitmapBuildWithRetractAggFunction<Bitmap> { + + public BitmapBuildWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapBuildWithRetractAccumulator acc) { + return acc.bitmap.isEmpty() ? null : RoaringBitmapData.from(acc.bitmap); + } + } + + /** + * Built-in BITMAP_BUILD_CARDINALITY_AGG with retraction aggregate function that returns + * cardinality. + */ + public static final class BitmapBuildCardinalityWithRetractAggFunction + extends AbstractBitmapBuildWithRetractAggFunction<Long> { + + public BitmapBuildCardinalityWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapBuildWithRetractAccumulator acc) { + return acc.bitmap.isEmpty() ? null : acc.bitmap.getLongCardinality(); + } + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrAggFunction.java similarity index 69% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrAggFunction.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrAggFunction.java index 19bdd5db219..65c26029ea7 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrAggFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrAggFunction.java @@ -33,14 +33,14 @@ import java.util.Objects; import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; -/** Built-in BITMAP_OR_AGG aggregate function. */ +/** Abstract base class for BITMAP_OR_AGG and BITMAP_OR_CARDINALITY_AGG. */ @Internal -public final class BitmapOrAggFunction - extends BuiltInAggregateFunction<Bitmap, BitmapOrAggFunction.BitmapOrAccumulator> { +public abstract class AbstractBitmapOrAggFunction<T> + extends BuiltInAggregateFunction<T, AbstractBitmapOrAggFunction.BitmapOrAccumulator> { private final transient DataType valueDataType; - public BitmapOrAggFunction(LogicalType valueType) { + public AbstractBitmapOrAggFunction(LogicalType valueType) { this.valueDataType = toInternalDataType(valueType); } @@ -59,16 +59,11 @@ public final class BitmapOrAggFunction BitmapOrAccumulator.class, DataTypes.FIELD("bitmap", DataTypes.BITMAP())); } - @Override - public DataType getOutputDataType() { - return DataTypes.BITMAP(); - } - // -------------------------------------------------------------------------------------------- // Accumulator // -------------------------------------------------------------------------------------------- - /** Accumulator for BITMAP_OR_AGG. */ + /** Accumulator for BITMAP_OR_AGG and BITMAP_OR_CARDINALITY_AGG. */ public static class BitmapOrAccumulator { public @Nullable RoaringBitmapData bitmap; @@ -81,8 +76,7 @@ public final class BitmapOrAggFunction if (obj == null || getClass() != obj.getClass()) { return false; } - BitmapOrAggFunction.BitmapOrAccumulator that = - (BitmapOrAggFunction.BitmapOrAccumulator) obj; + BitmapOrAccumulator that = (BitmapOrAccumulator) obj; return Objects.equals(bitmap, that.bitmap); } @@ -101,11 +95,6 @@ public final class BitmapOrAggFunction acc.bitmap = null; } - @Override - public Bitmap getValue(BitmapOrAccumulator acc) { - return Bitmap.from(acc.bitmap); - } - // -------------------------------------------------------------------------------------------- // Runtime // -------------------------------------------------------------------------------------------- @@ -133,4 +122,45 @@ public final class BitmapOrAggFunction } } } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_OR_AGG aggregate function that returns bitmap. */ + public static final class BitmapOrAggFunction extends AbstractBitmapOrAggFunction<Bitmap> { + + public BitmapOrAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapOrAccumulator acc) { + return Bitmap.from(acc.bitmap); + } + } + + /** Built-in BITMAP_OR_CARDINALITY_AGG aggregate function that returns cardinality. */ + public static final class BitmapOrCardinalityAggFunction + extends AbstractBitmapOrAggFunction<Long> { + + public BitmapOrCardinalityAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapOrAccumulator acc) { + return acc.bitmap == null ? null : acc.bitmap.getLongCardinality(); + } + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrWithRetractAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrWithRetractAggFunction.java similarity index 81% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrWithRetractAggFunction.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrWithRetractAggFunction.java index d4299c21511..af807c18e4e 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrWithRetractAggFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrWithRetractAggFunction.java @@ -36,15 +36,15 @@ import java.util.Objects; import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; -/** Built-in BITMAP_OR_AGG with retraction aggregate function. */ +/** Abstract base class for BITMAP_OR_AGG and BITMAP_OR_CARDINALITY_AGG with retraction. */ @Internal -public final class BitmapOrWithRetractAggFunction +public abstract class AbstractBitmapOrWithRetractAggFunction<T> extends BuiltInAggregateFunction< - Bitmap, BitmapOrWithRetractAggFunction.BitmapOrWithRetractAccumulator> { + T, AbstractBitmapOrWithRetractAggFunction.BitmapOrWithRetractAccumulator> { private final transient DataType valueDataType; - public BitmapOrWithRetractAggFunction(LogicalType valueType) { + public AbstractBitmapOrWithRetractAggFunction(LogicalType valueType) { this.valueDataType = toInternalDataType(valueType); } @@ -69,16 +69,11 @@ public final class BitmapOrWithRetractAggFunction DataTypes.INT().notNull(), DataTypes.INT().notNull()))); } - @Override - public DataType getOutputDataType() { - return DataTypes.BITMAP(); - } - // -------------------------------------------------------------------------------------------- // Accumulator // -------------------------------------------------------------------------------------------- - /** Accumulator for BITMAP_OR_AGG with retraction. */ + /** Accumulator for BITMAP_OR_AGG and BITMAP_OR_CARDINALITY_AGG with retraction. */ public static class BitmapOrWithRetractAccumulator { public int bitmapCount = 0; @@ -117,11 +112,6 @@ public final class BitmapOrWithRetractAggFunction acc.valueCount.clear(); } - @Override - public Bitmap getValue(BitmapOrWithRetractAccumulator acc) { - return acc.bitmapCount <= 0 ? null : RoaringBitmapData.from(acc.bitmap); - } - // -------------------------------------------------------------------------------------------- // Runtime // -------------------------------------------------------------------------------------------- @@ -226,4 +216,49 @@ public final class BitmapOrWithRetractAggFunction } } } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_OR_AGG with retraction aggregate function that returns bitmap. */ + public static final class BitmapOrWithRetractAggFunction + extends AbstractBitmapOrWithRetractAggFunction<Bitmap> { + + public BitmapOrWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapOrWithRetractAccumulator acc) { + return acc.bitmapCount <= 0 ? null : RoaringBitmapData.from(acc.bitmap); + } + } + + /** + * Built-in BITMAP_OR_CARDINALITY_AGG with retraction aggregate function that returns + * cardinality. + */ + public static final class BitmapOrCardinalityWithRetractAggFunction + extends AbstractBitmapOrWithRetractAggFunction<Long> { + + public BitmapOrCardinalityWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapOrWithRetractAccumulator acc) { + return acc.bitmapCount <= 0 ? null : acc.bitmap.getLongCardinality(); + } + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorAggFunction.java similarity index 68% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorAggFunction.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorAggFunction.java index 766362f424d..3890d257192 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorAggFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorAggFunction.java @@ -33,14 +33,14 @@ import java.util.Objects; import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; -/** Built-in BITMAP_XOR_AGG aggregate function. */ +/** Abstract base class for BITMAP_XOR_AGG and BITMAP_XOR_CARDINALITY_AGG. */ @Internal -public final class BitmapXorAggFunction - extends BuiltInAggregateFunction<Bitmap, BitmapXorAggFunction.BitmapXorAccumulator> { +public abstract class AbstractBitmapXorAggFunction<T> + extends BuiltInAggregateFunction<T, AbstractBitmapXorAggFunction.BitmapXorAccumulator> { private final transient DataType valueDataType; - public BitmapXorAggFunction(LogicalType valueType) { + public AbstractBitmapXorAggFunction(LogicalType valueType) { this.valueDataType = toInternalDataType(valueType); } @@ -59,16 +59,11 @@ public final class BitmapXorAggFunction BitmapXorAccumulator.class, DataTypes.FIELD("bitmap", DataTypes.BITMAP())); } - @Override - public DataType getOutputDataType() { - return DataTypes.BITMAP(); - } - // -------------------------------------------------------------------------------------------- // Accumulator // -------------------------------------------------------------------------------------------- - /** Accumulator for BITMAP_XOR_AGG. */ + /** Accumulator for BITMAP_XOR_AGG and BITMAP_XOR_CARDINALITY_AGG. */ public static class BitmapXorAccumulator { public @Nullable RoaringBitmapData bitmap; @@ -81,8 +76,7 @@ public final class BitmapXorAggFunction if (obj == null || getClass() != obj.getClass()) { return false; } - BitmapXorAggFunction.BitmapXorAccumulator that = - (BitmapXorAggFunction.BitmapXorAccumulator) obj; + BitmapXorAccumulator that = (BitmapXorAccumulator) obj; return Objects.equals(bitmap, that.bitmap); } @@ -101,11 +95,6 @@ public final class BitmapXorAggFunction acc.bitmap = null; } - @Override - public Bitmap getValue(BitmapXorAccumulator acc) { - return Bitmap.from(acc.bitmap); - } - // -------------------------------------------------------------------------------------------- // Runtime // -------------------------------------------------------------------------------------------- @@ -133,4 +122,45 @@ public final class BitmapXorAggFunction } } } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_XOR_AGG aggregate function that returns bitmap. */ + public static final class BitmapXorAggFunction extends AbstractBitmapXorAggFunction<Bitmap> { + + public BitmapXorAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapXorAccumulator acc) { + return Bitmap.from(acc.bitmap); + } + } + + /** Built-in BITMAP_XOR_CARDINALITY_AGG aggregate function that returns cardinality. */ + public static final class BitmapXorCardinalityAggFunction + extends AbstractBitmapXorAggFunction<Long> { + + public BitmapXorCardinalityAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapXorAccumulator acc) { + return acc.bitmap == null ? null : acc.bitmap.getLongCardinality(); + } + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorWithRetractAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorWithRetractAggFunction.java similarity index 69% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorWithRetractAggFunction.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorWithRetractAggFunction.java index d30e52beebb..fed8174dc34 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorWithRetractAggFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorWithRetractAggFunction.java @@ -33,15 +33,15 @@ import java.util.Objects; import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; -/** Built-in BITMAP_XOR_AGG with retraction aggregate function. */ +/** Abstract base class for BITMAP_XOR_AGG and BITMAP_XOR_CARDINALITY_AGG with retraction. */ @Internal -public final class BitmapXorWithRetractAggFunction +public abstract class AbstractBitmapXorWithRetractAggFunction<T> extends BuiltInAggregateFunction< - Bitmap, BitmapXorWithRetractAggFunction.BitmapXorWithRetractAccumulator> { + T, AbstractBitmapXorWithRetractAggFunction.BitmapXorWithRetractAccumulator> { private final transient DataType valueDataType; - public BitmapXorWithRetractAggFunction(LogicalType valueType) { + public AbstractBitmapXorWithRetractAggFunction(LogicalType valueType) { this.valueDataType = toInternalDataType(valueType); } @@ -62,16 +62,11 @@ public final class BitmapXorWithRetractAggFunction DataTypes.FIELD("bitmap", DataTypes.BITMAP().notNull())); } - @Override - public DataType getOutputDataType() { - return DataTypes.BITMAP(); - } - // -------------------------------------------------------------------------------------------- // Accumulator // -------------------------------------------------------------------------------------------- - /** Accumulator for BITMAP_XOR_AGG with retraction. */ + /** Accumulator for BITMAP_XOR_AGG and BITMAP_XOR_CARDINALITY_AGG with retraction. */ public static class BitmapXorWithRetractAccumulator { public int bitmapCount = 0; @@ -105,11 +100,6 @@ public final class BitmapXorWithRetractAggFunction acc.bitmap.clear(); } - @Override - public Bitmap getValue(BitmapXorWithRetractAccumulator acc) { - return acc.bitmapCount <= 0 ? null : RoaringBitmapData.from(acc.bitmap); - } - // -------------------------------------------------------------------------------------------- // Runtime // -------------------------------------------------------------------------------------------- @@ -139,4 +129,49 @@ public final class BitmapXorWithRetractAggFunction acc.bitmap.xor(other.bitmap); } } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_XOR_AGG with retraction aggregate function that returns bitmap. */ + public static final class BitmapXorWithRetractAggFunction + extends AbstractBitmapXorWithRetractAggFunction<Bitmap> { + + public BitmapXorWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapXorWithRetractAccumulator acc) { + return acc.bitmapCount <= 0 ? null : RoaringBitmapData.from(acc.bitmap); + } + } + + /** + * Built-in BITMAP_XOR_CARDINALITY_AGG with retraction aggregate function that returns + * cardinality. + */ + public static final class BitmapXorCardinalityWithRetractAggFunction + extends AbstractBitmapXorWithRetractAggFunction<Long> { + + public BitmapXorCardinalityWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapXorWithRetractAccumulator acc) { + return acc.bitmapCount <= 0 ? null : acc.bitmap.getLongCardinality(); + } + } }
