This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 26046c653a6d70a1386fc69671fae448ba072fb6 Author: dylanhz <[email protected]> AuthorDate: Tue Mar 24 02:02:12 2026 +0800 [FLINK-39187][table] Add the built-in function BITMAP_AND_AGG, BITMAP_OR_AGG, BITMAP_XOR_AGG --- docs/data/sql_functions.yml | 28 ++ docs/data/sql_functions_zh.yml | 28 ++ .../docs/reference/pyflink.table/expressions.rst | 3 + flink-python/pyflink/table/expression.py | 30 ++ .../pyflink/table/tests/test_expression.py | 3 + .../flink/table/api/internal/BaseExpressions.java | 36 +++ .../functions/BuiltInFunctionDefinitions.java | 36 +++ .../planner/plan/utils/AggFunctionFactory.scala | 36 +++ .../planner/functions/BitmapAggFunctionITCase.java | 310 ++++++++++++++++++++- .../runtime/batch/sql/OverAggregateITCase.scala | 30 ++ .../runtime/stream/sql/AggregateITCase.scala | 82 ++++++ .../runtime/stream/sql/OverAggregateITCase.scala | 40 +++ .../runtime/stream/sql/WindowAggregateITCase.scala | 28 ++ .../runtime/stream/table/AggregateITCase.scala | 41 +++ .../functions/aggregate/BitmapAndAggFunction.java | 136 +++++++++ .../aggregate/BitmapAndWithRetractAggFunction.java | 211 ++++++++++++++ .../functions/aggregate/BitmapOrAggFunction.java | 136 +++++++++ .../aggregate/BitmapOrWithRetractAggFunction.java | 229 +++++++++++++++ .../functions/aggregate/BitmapXorAggFunction.java | 136 +++++++++ .../aggregate/BitmapXorWithRetractAggFunction.java | 142 ++++++++++ 20 files changed, 1717 insertions(+), 4 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index fd7fc55c949..f1d6c30a418 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -1569,6 +1569,16 @@ aggregate: ``` bitmapagg: + - sql: BITMAP_AND_AGG(bitmap) + table: bitmap.bitmapAndAgg() + description: | + Aggregates the AND (intersection) of multiple bitmaps. + + NOTE: The retraction variant of this function may have significant performance overhead with large bitmaps. + + `bitmap BITMAP` + + Returns a `BITMAP`. - sql: BITMAP_BUILD_AGG(value) table: value.bitmapBuildAgg() description: | @@ -1576,6 +1586,24 @@ bitmapagg: `value INT` + Returns a `BITMAP`. + - sql: BITMAP_OR_AGG(bitmap) + table: bitmap.bitmapOrAgg() + description: | + Aggregates the OR (union) of multiple bitmaps. + + NOTE: The retraction variant of this function may have significant performance overhead with large bitmaps. + + `bitmap BITMAP` + + Returns a `BITMAP`. + - sql: BITMAP_XOR_AGG(bitmap) + table: bitmap.bitmapXorAgg() + description: | + Aggregates the XOR (symmetric difference) of multiple bitmaps. + + `bitmap BITMAP` + Returns a `BITMAP`. catalog: diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index c74b830968a..8278e89677f 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -1653,6 +1653,16 @@ aggregate: ``` bitmapagg: + - sql: BITMAP_AND_AGG(bitmap) + table: bitmap.bitmapAndAgg() + description: | + 聚合多个位图的交集 (AND)。 + + 注意:该函数的回撤版本在处理大位图时可能有显著的性能开销。 + + `bitmap BITMAP` + + 返回一个 `BITMAP`。 - sql: BITMAP_BUILD_AGG(value) table: value.bitmapBuildAgg() description: | @@ -1660,6 +1670,24 @@ bitmapagg: `value INT` + 返回一个 `BITMAP`。 + - sql: BITMAP_OR_AGG(bitmap) + table: bitmap.bitmapOrAgg() + description: | + 聚合多个位图的并集 (OR)。 + + 注意:该函数的回撤版本在处理大位图时可能有显著的性能开销。 + + `bitmap BITMAP` + + 返回一个 `BITMAP`。 + - sql: BITMAP_XOR_AGG(bitmap) + table: bitmap.bitmapXorAgg() + description: | + 聚合多个位图的异或 (XOR)。 + + `bitmap BITMAP` + 返回一个 `BITMAP`。 catalog: diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 727f5e979b0..c54edf08ca2 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -347,12 +347,15 @@ Bitmap functions Expression.bitmap_and Expression.bitmap_andnot + Expression.bitmap_and_agg Expression.bitmap_build Expression.bitmap_build_agg Expression.bitmap_cardinality Expression.bitmap_from_bytes Expression.bitmap_or + Expression.bitmap_or_agg Expression.bitmap_to_array Expression.bitmap_to_bytes Expression.bitmap_to_string Expression.bitmap_xor + Expression.bitmap_xor_agg diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 9241bbadf55..ff3628f0716 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -2299,6 +2299,17 @@ class Expression(Generic[T]): """ return _binary_op("bitmapAndnot")(self, bitmap2) + def bitmap_and_agg(self): + """ + Aggregates the AND (intersection) of multiple bitmaps. + + NOTE: The retraction variant of this function may have significant performance overhead + with large bitmaps. + + :return: a BITMAP expression + """ + return _unary_op("bitmapAndAgg")(self) + def bitmap_build(self) -> 'Expression': """ Creates a bitmap from an array of 32-bit integers. @@ -2351,6 +2362,17 @@ class Expression(Generic[T]): """ return _binary_op("bitmapOr")(self, bitmap2) + def bitmap_or_agg(self): + """ + Aggregates the OR (union) of multiple bitmaps. + + NOTE: The retraction variant of this function may have significant performance overhead + with large bitmaps. + + :return: a BITMAP expression + """ + return _unary_op("bitmapOrAgg")(self) + def bitmap_to_array(self) -> 'Expression': """ Converts a bitmap to an array of 32-bit integers, the values are sorted by \ @@ -2403,6 +2425,14 @@ class Expression(Generic[T]): """ return _binary_op("bitmapXor")(self, bitmap2) + def bitmap_xor_agg(self): + """ + Aggregates the XOR (symmetric difference) of multiple bitmaps. + + :return: a BITMAP expression + """ + return _unary_op("bitmapXorAgg")(self) + # add the docs _make_math_log_doc() diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index f6d10fc99c5..74329037bd0 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -268,15 +268,18 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): # bitmap functions self.assertEqual("BITMAP_AND(a, b)", str(expr1.bitmap_and(expr2))) self.assertEqual("BITMAP_ANDNOT(a, b)", str(expr1.bitmap_andnot(expr2))) + self.assertEqual("BITMAP_AND_AGG(a)", str(expr1.bitmap_and_agg())) self.assertEqual("BITMAP_BUILD(a)", str(expr1.bitmap_build())) self.assertEqual("BITMAP_BUILD_AGG(a)", str(expr1.bitmap_build_agg())) self.assertEqual("BITMAP_CARDINALITY(a)", str(expr1.bitmap_cardinality())) self.assertEqual("BITMAP_FROM_BYTES(a)", str(expr1.bitmap_from_bytes())) self.assertEqual("BITMAP_OR(a, b)", str(expr1.bitmap_or(expr2))) + self.assertEqual("BITMAP_OR_AGG(a)", str(expr1.bitmap_or_agg())) self.assertEqual("BITMAP_TO_ARRAY(a)", str(expr1.bitmap_to_array())) self.assertEqual("BITMAP_TO_BYTES(a)", str(expr1.bitmap_to_bytes())) self.assertEqual("BITMAP_TO_STRING(a)", str(expr1.bitmap_to_string())) self.assertEqual("BITMAP_XOR(a, b)", str(expr1.bitmap_xor(expr2))) + self.assertEqual("BITMAP_XOR_AGG(a)", str(expr1.bitmap_xor_agg())) def test_expressions(self): expr1 = col('a') diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 086ef39867e..90735d6f7fc 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -78,15 +78,18 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BETWEE import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_ANDNOT; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_CARDINALITY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_FROM_BYTES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_ARRAY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_BYTES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_STRING; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BTRIM; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CARDINALITY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST; @@ -2641,6 +2644,18 @@ public abstract class BaseExpressions<InType, OutType> { unresolvedCall(BITMAP_ANDNOT, toExpr(), objectToExpression(bitmap2))); } + /** + * Aggregates the AND (intersection) of multiple bitmaps. + * + * <p>NOTE: The retraction variant of this function may have significant performance overhead + * with large bitmaps. + * + * @return a BITMAP expression + */ + public OutType bitmapAndAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_AND_AGG, toExpr())); + } + /** * Creates a bitmap from an array of 32-bit integers. * @@ -2700,6 +2715,18 @@ public abstract class BaseExpressions<InType, OutType> { unresolvedCall(BITMAP_OR, toExpr(), objectToExpression(bitmap2))); } + /** + * Aggregates the OR (union) of multiple bitmaps. + * + * <p>NOTE: The retraction variant of this function may have significant performance overhead + * with large bitmaps. + * + * @return a BITMAP expression + */ + public OutType bitmapOrAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_OR_AGG, toExpr())); + } + /** * Converts a bitmap to an array of 32-bit integers, the values are sorted by {@link * Integer#compareUnsigned}. @@ -2759,4 +2786,13 @@ public abstract class BaseExpressions<InType, OutType> { return toApiSpecificExpression( unresolvedCall(BITMAP_XOR, toExpr(), objectToExpression(bitmap2))); } + + /** + * Aggregates the XOR (symmetric difference) of multiple bitmaps. + * + * @return a BITMAP expression + */ + public OutType bitmapXorAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_XOR_AGG, toExpr())); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 44c40421e30..f20f871baf0 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -3015,6 +3015,18 @@ public final class BuiltInFunctionDefinitions { "org.apache.flink.table.runtime.functions.scalar.BitmapAndnotFunction") .build(); + public static final BuiltInFunctionDefinition BITMAP_AND_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_AND_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(explicit(DataTypes.BITMAP())) + .runtimeProvided() + .build(); + public static final BuiltInFunctionDefinition BITMAP_BUILD = BuiltInFunctionDefinition.newBuilder() .name("BITMAP_BUILD") @@ -3091,6 +3103,18 @@ public final class BuiltInFunctionDefinitions { "org.apache.flink.table.runtime.functions.scalar.BitmapOrFunction") .build(); + public static final BuiltInFunctionDefinition BITMAP_OR_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_OR_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(explicit(DataTypes.BITMAP())) + .runtimeProvided() + .build(); + public static final BuiltInFunctionDefinition BITMAP_TO_ARRAY = BuiltInFunctionDefinition.newBuilder() .name("BITMAP_TO_ARRAY") @@ -3145,6 +3169,18 @@ public final class BuiltInFunctionDefinitions { "org.apache.flink.table.runtime.functions.scalar.BitmapXorFunction") .build(); + public static final BuiltInFunctionDefinition BITMAP_XOR_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_XOR_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(explicit(DataTypes.BITMAP())) + .runtimeProvided() + .build(); + // -------------------------------------------------------------------------------------------- // Other functions // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala index 3eecd384abf..20bedac6c13 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala @@ -176,6 +176,12 @@ class AggFunctionFactory( createPercentileAggFunction(argTypes) case BuiltInFunctionDefinitions.BITMAP_BUILD_AGG => createBitmapBuildAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_AND_AGG => + createBitmapAndAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_OR_AGG => + createBitmapOrAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_XOR_AGG => + createBitmapXorAggFunction(argTypes, index) // DeclarativeAggregateFunction & UDF case _ => bridge.getDefinition.asInstanceOf[UserDefinedFunction] @@ -661,4 +667,34 @@ class AggFunctionFactory( new BitmapBuildAggFunction(argTypes(0)) } } + + private def createBitmapAndAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new BitmapAndWithRetractAggFunction(argTypes(0)) + } else { + new BitmapAndAggFunction(argTypes(0)) + } + } + + private def createBitmapOrAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new BitmapOrWithRetractAggFunction(argTypes(0)) + } else { + new BitmapOrAggFunction(argTypes(0)) + } + } + + private def createBitmapXorAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new BitmapXorWithRetractAggFunction(argTypes(0)) + } else { + new BitmapXorAggFunction(argTypes(0)) + } + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java index 8312da400e3..cbd79bfea2a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java @@ -28,12 +28,14 @@ import java.util.Collections; import java.util.List; import java.util.stream.Stream; +import static org.apache.flink.table.api.DataTypes.ARRAY; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.BITMAP; import static org.apache.flink.table.api.DataTypes.INT; import static org.apache.flink.table.api.DataTypes.ROW; import static org.apache.flink.table.api.DataTypes.STRING; import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.api.Expressions.array; import static org.apache.flink.types.RowKind.DELETE; import static org.apache.flink.types.RowKind.INSERT; import static org.apache.flink.types.RowKind.UPDATE_AFTER; @@ -45,10 +47,109 @@ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { @Override Stream<TestSpec> getTestCaseSpecs() { final List<TestSpec> specs = new ArrayList<>(); + specs.addAll(bitmapAndAggTestCases()); specs.addAll(bitmapBuildAggTestCases()); + specs.addAll(bitmapOrAggTestCases()); + specs.addAll(bitmapXorAggTestCases()); return specs.stream(); } + private List<TestSpec> bitmapAndAggTestCases() { + return Arrays.asList( + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG) + .withDescription("without retraction") + .withSource( + ROW(BITMAP(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, fromArray(2, 3, 4), "A"), + Row.ofKind(INSERT, fromArray(1, 3, 5), "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(1, 2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(4, 6, 8, 12, 16), "B"), + Row.ofKind(INSERT, fromArray(-1, 0, 1), "C"), + Row.ofKind(INSERT, fromArray(-1, -2), "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(INSERT, null, "D"))) + .testResult( + source -> + "SELECT f1, BITMAP_AND_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapAndAgg()), + ROW(STRING(), BITMAP()), + ROW(STRING(), BITMAP()), + Arrays.asList( + Row.of("A", fromArray(3)), + Row.of("B", fromArray(4, 6)), + Row.of("C", fromArray(-1)), + Row.of("D", null))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG) + .withDescription("with retraction") + .withSource( + ROW(BITMAP(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, fromArray(2, 4, 6), "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(DELETE, fromArray(2, 4, 6), "A"), + Row.ofKind(INSERT, fromArray(1, 3, 5), "B"), + Row.ofKind(DELETE, fromArray(1, 3, 5), "B"), + Row.ofKind(INSERT, null, "B"), + Row.ofKind(INSERT, fromArray(-1, 0, 2, 3, 4), "B"), + Row.ofKind(DELETE, fromArray(2, 4, 6), "B"), // count < 0 + Row.ofKind(INSERT, fromArray(2, 3, 4, 5, 6), "B"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(1, 4, 7), "B"), + Row.ofKind(DELETE, fromArray(1, 4, 7), "B"), + Row.ofKind(UPDATE_BEFORE, fromArray(2, 4, 6), "B"), + Row.ofKind(UPDATE_AFTER, fromArray(3, 4, 5), "B"), + Row.ofKind(INSERT, fromArray(2, 3, 11), "C"), + Row.ofKind(INSERT, fromArray(1, 5, 13), "C"), + Row.ofKind(INSERT, fromArray(-1, -3, 0), "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(DELETE, fromArray(-1, -3, 0), "C"), + Row.ofKind(DELETE, fromArray(1, 5, 13), "C"), + Row.ofKind(DELETE, null, "C"), + Row.ofKind(UPDATE_BEFORE, fromArray(2, 3, 11), "C"), + Row.ofKind(UPDATE_AFTER, fromArray(1, 2), "C"))) + .testResult( + source -> + "SELECT f1, BITMAP_AND_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapAndAgg()), + ROW(STRING(), BITMAP()), + ROW(STRING(), BITMAP()), + Arrays.asList( + Row.of("A", null), + Row.of("B", fromArray(3, 4)), + Row.of("C", fromArray(1, 2)))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG) + .withDescription("Validation Error") + .withSource( + ROW(INT(), ARRAY(INT()), STRING()), + Collections.singletonList(Row.ofKind(INSERT, 1, array(1, 2), "A"))) + .testValidationError( + source -> + "SELECT f2, BITMAP_AND_AGG(f0) FROM " + + source + + " GROUP BY f2", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f2")), + $("f2"), + $("f1").bitmapAndAgg()), + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_AND_AGG(bitmap <BITMAP>)")); + } + private List<TestSpec> bitmapBuildAggTestCases() { return Arrays.asList( TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG) @@ -79,8 +180,8 @@ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { ROW(STRING(), BITMAP()), ROW(STRING(), BITMAP()), Arrays.asList( - Row.of("A", Bitmap.fromArray(new int[] {-1, 1, 2, 3, 4})), - Row.of("B", Bitmap.fromArray(new int[] {-1, 1, 2})), + Row.of("A", fromArray(-1, 1, 2, 3, 4)), + Row.of("B", fromArray(-1, 1, 2)), Row.of("C", null))), TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG) .withDescription("with retraction") @@ -124,8 +225,8 @@ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { ROW(STRING(), BITMAP()), Arrays.asList( Row.of("A", null), - Row.of("B", Bitmap.fromArray(new int[] {-1, 1, 2})), - Row.of("C", Bitmap.fromArray(new int[] {1})))), + Row.of("B", fromArray(-1, 1, 2)), + Row.of("C", fromArray(1)))), TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG) .withDescription("Validation Error") .withSource( @@ -143,4 +244,205 @@ class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { "Invalid input arguments. Expected signatures are:\n" + "BITMAP_BUILD_AGG(value <INTEGER>)")); } + + private List<TestSpec> bitmapOrAggTestCases() { + return Arrays.asList( + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG) + .withDescription("without retraction") + .withSource( + ROW(BITMAP(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, fromArray(2, 3, 4), "A"), + Row.ofKind(INSERT, fromArray(1, 3, 5), "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(1, 2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(4, 6, 8, 12, 16), "B"), + Row.ofKind(INSERT, fromArray(-1, 0, 1), "C"), + Row.ofKind(INSERT, fromArray(-1, -2), "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(INSERT, null, "D"))) + .testResult( + source -> + "SELECT f1, BITMAP_OR_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapOrAgg()), + ROW(STRING(), BITMAP()), + ROW(STRING(), BITMAP()), + Arrays.asList( + Row.of("A", fromArray(1, 2, 3, 4, 5)), + Row.of( + "B", + Bitmap.fromArray( + new int[] {1, 2, 4, 6, 8, 12, 16})), + Row.of("C", fromArray(0, 1, -2, -1)), + Row.of("D", null))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG) + .withDescription("with retraction") + .withSource( + ROW(BITMAP(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(DELETE, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, fromArray(1, 3, 5), "B"), + Row.ofKind(DELETE, fromArray(1, 3, 5), "B"), + Row.ofKind(INSERT, null, "B"), + Row.ofKind(INSERT, fromArray(-1, 0, 2, 3, 4), "B"), + Row.ofKind(DELETE, fromArray(2, 4, 6), "B"), // count < 0 + Row.ofKind(INSERT, fromArray(2, 3, 4, 5, 6), "B"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(1, 4, 7), "B"), + Row.ofKind(DELETE, fromArray(1, 4, 7), "B"), + Row.ofKind(UPDATE_BEFORE, fromArray(2, 4, 6), "B"), + Row.ofKind(UPDATE_AFTER, fromArray(3, 4, 5), "B"), + Row.ofKind(INSERT, fromArray(2, 3, 11), "C"), + Row.ofKind(INSERT, fromArray(1, 5, 13), "C"), + Row.ofKind(INSERT, fromArray(-1, -3, 0), "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(DELETE, fromArray(-1, -3, 0), "C"), + Row.ofKind(DELETE, fromArray(1, 5, 13), "C"), + Row.ofKind(DELETE, null, "C"), + Row.ofKind(UPDATE_BEFORE, fromArray(2, 3, 11), "C"), + Row.ofKind(UPDATE_AFTER, fromArray(1, 2), "C"))) + .testResult( + source -> + "SELECT f1, BITMAP_OR_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapOrAgg()), + ROW(STRING(), BITMAP()), + ROW(STRING(), BITMAP()), + Arrays.asList( + Row.of("A", null), + Row.of("B", fromArray(0, 2, 3, 4, 5, 6, -1)), + Row.of("C", fromArray(1, 2)))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG) + .withDescription("Validation Error") + .withSource( + ROW(INT(), ARRAY(INT()), STRING()), + Collections.singletonList(Row.ofKind(INSERT, 1, array(1, 2), "A"))) + .testValidationError( + source -> + "SELECT f2, BITMAP_OR_AGG(f0) FROM " + + source + + " GROUP BY f2", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f2")), + $("f2"), + $("f1").bitmapOrAgg()), + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_OR_AGG(bitmap <BITMAP>)")); + } + + private List<TestSpec> bitmapXorAggTestCases() { + return Arrays.asList( + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG) + .withDescription("without retraction") + .withSource( + ROW(BITMAP(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, fromArray(2, 3, 4), "A"), + Row.ofKind(INSERT, fromArray(1, 3, 5), "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(1, 2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(4, 6, 8, 12, 16), "B"), + Row.ofKind(INSERT, fromArray(-1, 0, 1), "C"), + Row.ofKind(INSERT, fromArray(-1, -2), "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(INSERT, null, "D"))) + .testResult( + source -> + "SELECT f1, BITMAP_XOR_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapXorAgg()), + ROW(STRING(), BITMAP()), + ROW(STRING(), BITMAP()), + Arrays.asList( + Row.of("A", fromArray(3, 4, 5)), + Row.of("B", fromArray(1, 4, 6, 8, 12, 16)), + Row.of("C", fromArray(0, 1, -2)), + Row.of("D", null))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG) + .withDescription("with retraction") + .withSource( + ROW(BITMAP(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(DELETE, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, fromArray(1, 3, 5), "B"), + Row.ofKind(DELETE, fromArray(1, 3, 5), "B"), + Row.ofKind(INSERT, null, "B"), + Row.ofKind(INSERT, fromArray(-1, 0, 2, 3, 4), "B"), + Row.ofKind(DELETE, fromArray(2, 4, 6), "B"), // count < 0 + Row.ofKind(INSERT, fromArray(2, 3, 4, 5, 6), "B"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(1, 4, 7), "B"), + Row.ofKind(DELETE, fromArray(1, 4, 7), "B"), + Row.ofKind(UPDATE_BEFORE, fromArray(2, 4, 6), "B"), + Row.ofKind(UPDATE_AFTER, fromArray(3, 4, 5), "B"), + Row.ofKind(INSERT, fromArray(2, 3, 11), "C"), + Row.ofKind(INSERT, fromArray(1, 5, 13), "C"), + Row.ofKind(INSERT, fromArray(-1, -3, 0), "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(DELETE, fromArray(-1, -3, 0), "C"), + Row.ofKind(DELETE, fromArray(1, 5, 13), "C"), + Row.ofKind(DELETE, null, "C"), + Row.ofKind(UPDATE_BEFORE, fromArray(2, 3, 11), "C"), + Row.ofKind(UPDATE_AFTER, fromArray(1, 2), "C"))) + .testResult( + source -> + "SELECT f1, BITMAP_XOR_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapXorAgg()), + ROW(STRING(), BITMAP()), + ROW(STRING(), BITMAP()), + Arrays.asList( + Row.of("A", null), + Row.of("B", fromArray(0, 3, 4, 6, -1)), + Row.of("C", fromArray(1, 2)))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG) + .withDescription("Validation Error") + .withSource( + ROW(INT(), ARRAY(INT()), STRING()), + Collections.singletonList(Row.ofKind(INSERT, 1, array(1, 2), "A"))) + .testValidationError( + source -> + "SELECT f2, BITMAP_XOR_AGG(f0) FROM " + + source + + " GROUP BY f2", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f2")), + $("f2"), + $("f1").bitmapXorAgg()), + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_XOR_AGG(bitmap <BITMAP>)")); + } + + // ~ Utils -------------------------------------------------------------------- + + private Bitmap fromArray(int... values) { + return Bitmap.fromArray(values); + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala index c9ca677c07a..69040c961d8 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala @@ -3004,6 +3004,36 @@ class OverAggregateITCase extends BatchTestBase { ) ) } + + @Test + def testBitmapLogicalOpsAgg(): Unit = { + checkResult( + "SELECT " + + "d, f, " + + "BITMAP_AND_AGG(BITMAP_BUILD(ARRAY[d,f])) OVER (ORDER BY e ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " + + "BITMAP_OR_AGG(BITMAP_BUILD(ARRAY[d,f])) OVER (ORDER BY e ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " + + "BITMAP_XOR_AGG(BITMAP_BUILD(ARRAY[d,f])) OVER (ORDER BY e ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " + + "FROM Table5", + Seq( + row(1, 0, "{0,1}", "{0,1}", "{0,1}"), + row(2, 1, "{1}", "{0,1,2}", "{0,2}"), + row(2, 2, "{}", "{0,1,2}", "{0}"), + row(3, 3, "{}", "{1,2,3}", "{1,3}"), + row(3, 4, "{}", "{2,3,4}", "{2,4}"), + row(3, 5, "{3}", "{3,4,5}", "{3,4,5}"), + row(4, 6, "{}", "{3,4,5,6}", "{5,6}"), + row(4, 7, "{}", "{3,4,5,6,7}", "{3,5,6,7}"), + row(4, 8, "{4}", "{4,6,7,8}", "{4,6,7,8}"), + row(4, 9, "{4}", "{4,7,8,9}", "{4,7,8,9}"), + row(5, 10, "{}", "{4,5,8,9,10}", "{5,8,9,10}"), + row(5, 11, "{}", "{4,5,9,10,11}", "{4,9,10,11}"), + row(5, 12, "{5}", "{5,10,11,12}", "{5,10,11,12}"), + row(5, 13, "{5}", "{5,11,12,13}", "{5,11,12,13}"), + row(5, 14, "{5}", "{5,12,13,14}", "{5,12,13,14}") + ) + ) + } + } /** The initial accumulator for count aggregate function */ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala index cd2adf0154a..fbee1a8585f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala @@ -2190,6 +2190,88 @@ class AggregateITCase( val expected = List(s"{4,5,6},{4,8,12}") assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) } + + @TestTemplate + def testBitmapLogicalOpsAgg(): Unit = { + val data = new mutable.MutableList[(Int, Int, Int, String)] + data.+=((-3, 5, 0, "a")) + data.+=((7, 2, 5, "b")) + data.+=((-3, 8, -8, "c")) + data.+=((2, 1, 7, "b")) + data.+=((2, 9, 0, "a")) + data.+=((8, 3, -3, "c")) + data.+=((7, 6, 2, "b")) + data.+=((0, 4, 5, "a")) + data.+=((-3, 7, 8, "c")) + data.+=((5, 0, 2, "b")) + data.+=((0, 10, 5, "a")) + data.+=((2, 5, 0, "a")) + + val sql = + """ + |SELECT + | d, + | CAST(BITMAP_AND_AGG(BITMAP_BUILD(ARRAY[a, b, c])) AS STRING), + | CAST(BITMAP_OR_AGG(BITMAP_BUILD(ARRAY[a, b, c])) AS STRING), + | CAST(BITMAP_XOR_AGG(BITMAP_BUILD(ARRAY[a, b, c])) AS STRING) + |FROM MyTable + |GROUP BY d + """.stripMargin + + val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd) + tEnv.createTemporaryView("MyTable", t) + + val sink = new TestingRetractSink + tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = + List( + s"a,{0},{0,2,4,5,9,10,${Integer.toUnsignedLong(-3)}},{0,4,9,10,${Integer.toUnsignedLong(-3)}}", + "b,{2},{0,1,2,5,6,7},{0,1,6,7}", + s"c,{8,${Integer.toUnsignedLong(-3)}},{3,7,8,${Integer.toUnsignedLong(-8)},${Integer + .toUnsignedLong(-3)}},{3,7,8,${Integer.toUnsignedLong(-8)},${Integer + .toUnsignedLong(-3)}}" + ) + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testBitmapLogicalOpsAggWithRetract(): Unit = { + val data = new mutable.MutableList[(Int, Int, Int, String)] + for (i <- 3 until 0 by -1) { + data.+=((i, i + 1, i + 2, "a")) + data.+=((i * 2, i * 2 + 1, i * 2 + 2, "b")) + data.+=((i * 3, i * 3 + 1, i * 3 + 2, "c")) + } + + val inner = + """ + |SELECT + | LAST_VALUE(BITMAP_BUILD(ARRAY[a, b, c])) AS last, + | d + |FROM MyTable + |GROUP BY d + """.stripMargin + val sql = + s""" + |SELECT + | CAST(BITMAP_AND_AGG(last) AS STRING), + | CAST(BITMAP_OR_AGG(last) AS STRING), + | CAST(BITMAP_XOR_AGG(last) AS STRING) + |FROM ($inner) + """.stripMargin + + val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd) + tEnv.createTemporaryView("MyTable", t) + + val sink = new TestingRetractSink + tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = List(s"{3},{1,2,3,4,5},{1,3,5}") + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } } object AggregateITCase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala index c4deb3500a9..a1f60de7746 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala @@ -1538,6 +1538,46 @@ class OverAggregateITCase(mode: StateBackendMode, unboundedOverVersion: Int) ) assertThat(sink.getAppendResults).isEqualTo(expected) } + + @TestTemplate + def testBitmapLogicalOpsAgg(): Unit = { + val sql = + """ + |SELECT + | a, c, + | BITMAP_AND_AGG(BITMAP_BUILD(ARRAY[a,c])) OVER (ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), + | BITMAP_OR_AGG(BITMAP_BUILD(ARRAY[a,c])) OVER (ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), + | BITMAP_XOR_AGG(BITMAP_BUILD(ARRAY[a,c])) OVER (ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) + |FROM MyTable + """.stripMargin + + val t = + failingDataSource(TestData.tupleData5).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) + tEnv.createTemporaryView("MyTable", t) + + val sink = new TestingAppendSink() + tEnv.sqlQuery(sql).toDataStream.addSink(sink).setParallelism(1) + env.execute() + + val expected = List( + "1,0,{0,1},{0,1},{0,1}", + "2,1,{1},{0,1,2},{0,2}", + "2,2,{},{0,1,2},{0}", + "3,3,{},{1,2,3},{1,3}", + "3,4,{},{2,3,4},{2,4}", + "3,5,{3},{3,4,5},{3,4,5}", + "4,6,{},{3,4,5,6},{5,6}", + "4,7,{},{3,4,5,6,7},{3,5,6,7}", + "4,8,{4},{4,6,7,8},{4,6,7,8}", + "4,9,{4},{4,7,8,9},{4,7,8,9}", + "5,10,{},{4,5,8,9,10},{5,8,9,10}", + "5,11,{},{4,5,9,10,11},{4,9,10,11}", + "5,12,{5},{5,10,11,12},{5,10,11,12}", + "5,13,{5},{5,11,12,13},{5,11,12,13}", + "5,14,{5},{5,12,13,14},{5,12,13,14}" + ) + assertThat(sink.getAppendResults).isEqualTo(expected) + } } object OverAggregateITCase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala index 0c498882137..52c8ff9d131 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala @@ -1116,6 +1116,34 @@ class WindowAggregateITCase( }) .map(_.mkString(",")) } + + @TestTemplate + def testBitmapLogicalOpsAggOnEventTimeTumbleWindow(): Unit = { + val sql = + """ + |SELECT + | window_start, + | window_end, + | BITMAP_AND_AGG(BITMAP_BUILD(ARRAY[`int`, 2*`int`, 4*`int`])), + | BITMAP_OR_AGG(BITMAP_BUILD(ARRAY[`int`, 2*`int`, 4*`int`])), + | BITMAP_XOR_AGG(BITMAP_BUILD(ARRAY[`int`, 2*`int`, 4*`int`])) + |FROM TABLE( + | TUMBLE(TABLE T1_CDC, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY window_start, window_end + """.stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toDataStream.addSink(sink) + env.execute() + + val expected = List( + "2020-10-10T00:00,2020-10-10T00:00:05,{},{2,4,5,8,10,20,22,44,88},{2,4,5,8,10,20,22,44,88}", + "2020-10-10T00:00:05,2020-10-10T00:00:10,{6,12},{3,6,12,24},{6,12,24}", + "2020-10-10T00:00:15,2020-10-10T00:00:20,{4,8,16},{4,8,16},{4,8,16}" + ) + val result = sink.getAppendResults.sorted + assertThat(result.mkString("\n")).isEqualTo(expected.mkString("\n")) + } } object WindowAggregateITCase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala index 29a6e80a2e7..08e27c452cc 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala @@ -635,4 +635,45 @@ class AggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase ) assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) } + + @TestTemplate + def testBitmapLogicalOpsAgg(): Unit = { + val data = new mutable.MutableList[(Int, Int, Int, String)] + data.+=((-3, 5, 0, "a")) + data.+=((7, 2, 5, "b")) + data.+=((-3, 8, -8, "c")) + data.+=((2, 1, 7, "b")) + data.+=((2, 9, 0, "a")) + data.+=((8, 3, -3, "c")) + data.+=((7, 6, 2, "b")) + data.+=((0, 4, 5, "a")) + data.+=((-3, 7, 8, "c")) + data.+=((5, 0, 2, "b")) + data.+=((0, 10, 5, "a")) + data.+=((2, 5, 0, "a")) + + val t = failingDataSource(data) + .toTable(tEnv, 'a, 'b, 'c, 'd) + .groupBy('d) + .select( + 'd, + array('a, 'b, 'c).bitmapBuild().bitmapAndAgg().cast(DataTypes.STRING()), + array('a, 'b, 'c).bitmapBuild().bitmapOrAgg().cast(DataTypes.STRING()), + array('a, 'b, 'c).bitmapBuild().bitmapXorAgg().cast(DataTypes.STRING()) + ) + + val sink = new TestingRetractSink + t.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = + List( + s"a,{0},{0,2,4,5,9,10,${Integer.toUnsignedLong(-3)}},{0,4,9,10,${Integer.toUnsignedLong(-3)}}", + "b,{2},{0,1,2,5,6,7},{0,1,6,7}", + s"c,{8,${Integer.toUnsignedLong(-3)}},{3,7,8,${Integer.toUnsignedLong(-8)},${Integer + .toUnsignedLong(-3)}},{3,7,8,${Integer.toUnsignedLong(-8)},${Integer + .toUnsignedLong(-3)}}" + ) + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndAggFunction.java new file mode 100644 index 00000000000..4c1ad8a33b5 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndAggFunction.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Built-in BITMAP_AND_AGG aggregate function. */ +@Internal +public final class BitmapAndAggFunction + extends BuiltInAggregateFunction<Bitmap, BitmapAndAggFunction.BitmapAndAccumulator> { + + private final transient DataType valueDataType; + + public BitmapAndAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List<DataType> getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapAndAccumulator.class, DataTypes.FIELD("bitmap", DataTypes.BITMAP())); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_AND_AGG. */ + public static class BitmapAndAccumulator { + + public @Nullable RoaringBitmapData bitmap; + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapAndAggFunction.BitmapAndAccumulator that = + (BitmapAndAggFunction.BitmapAndAccumulator) obj; + return Objects.equals(bitmap, that.bitmap); + } + + @Override + public int hashCode() { + return Objects.hash(bitmap); + } + } + + @Override + public BitmapAndAccumulator createAccumulator() { + return new BitmapAndAccumulator(); + } + + public void resetAccumulator(BitmapAndAccumulator acc) { + acc.bitmap = null; + } + + @Override + public Bitmap getValue(BitmapAndAccumulator acc) { + return Bitmap.from(acc.bitmap); + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapAndAccumulator acc, @Nullable Bitmap bitmap) { + if (bitmap == null) { + return; + } + + if (acc.bitmap != null) { + acc.bitmap.and(bitmap); + } else { + acc.bitmap = RoaringBitmapData.from(bitmap); + } + } + + public void merge(BitmapAndAccumulator acc, Iterable<BitmapAndAccumulator> its) { + for (BitmapAndAccumulator other : its) { + if (other.bitmap != null) { + if (acc.bitmap != null) { + acc.bitmap.and(other.bitmap); + } else { + acc.bitmap = RoaringBitmapData.from(other.bitmap); + } + } + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndWithRetractAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndWithRetractAggFunction.java new file mode 100644 index 00000000000..dd77b3fb339 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapAndWithRetractAggFunction.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.dataview.MapView; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Built-in BITMAP_AND_AGG with retraction aggregate function. */ +@Internal +public final class BitmapAndWithRetractAggFunction + extends BuiltInAggregateFunction< + Bitmap, BitmapAndWithRetractAggFunction.BitmapAndWithRetractAccumulator> { + + private final transient DataType valueDataType; + + public BitmapAndWithRetractAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List<DataType> getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapAndWithRetractAccumulator.class, + DataTypes.FIELD("bitmapCount", DataTypes.INT().notNull()), + DataTypes.FIELD( + "valueCount", + MapView.newMapViewDataType( + DataTypes.INT().notNull(), DataTypes.INT().notNull()))); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_AND_AGG with retraction. */ + public static class BitmapAndWithRetractAccumulator { + + public int bitmapCount = 0; + public MapView<Integer, Integer> valueCount = new MapView<>(); + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapAndWithRetractAccumulator that = (BitmapAndWithRetractAccumulator) obj; + return bitmapCount == that.bitmapCount && Objects.equals(valueCount, that.valueCount); + } + + @Override + public int hashCode() { + return Objects.hash(bitmapCount, valueCount); + } + } + + @Override + public BitmapAndWithRetractAccumulator createAccumulator() { + return new BitmapAndWithRetractAccumulator(); + } + + public void resetAccumulator(BitmapAndWithRetractAccumulator acc) { + acc.bitmapCount = 0; + acc.valueCount.clear(); + } + + @Override + public Bitmap getValue(BitmapAndWithRetractAccumulator acc) { + if (acc.bitmapCount <= 0) { + return null; + } + + RoaringBitmapData bitmap = RoaringBitmapData.empty(); + try { + for (Map.Entry<Integer, Integer> entry : acc.valueCount.entries()) { + if (entry.getValue() == acc.bitmapCount) { + bitmap.add(entry.getKey()); + } + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + + return bitmap; + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapAndWithRetractAccumulator acc, @Nullable Bitmap bitmap) + throws Exception { + if (bitmap == null) { + return; + } + + acc.bitmapCount++; + + RoaringBitmapData.toRoaringBitmapData(bitmap) + .forEach( + value -> { + try { + Integer count = acc.valueCount.get(value); + count = count == null ? 1 : count + 1; + + if (count == 0) { + acc.valueCount.remove(value); + } else { + acc.valueCount.put(value, count); + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + }); + } + + public void retract(BitmapAndWithRetractAccumulator acc, @Nullable Bitmap bitmap) + throws Exception { + if (bitmap == null) { + return; + } + + acc.bitmapCount--; + + RoaringBitmapData.toRoaringBitmapData(bitmap) + .forEach( + value -> { + try { + Integer count = acc.valueCount.get(value); + count = count == null ? -1 : count - 1; + + if (count == 0) { + acc.valueCount.remove(value); + } else { + acc.valueCount.put(value, count); + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + }); + } + + public void merge( + BitmapAndWithRetractAccumulator acc, Iterable<BitmapAndWithRetractAccumulator> its) + throws Exception { + for (BitmapAndWithRetractAccumulator other : its) { + acc.bitmapCount += other.bitmapCount; + + for (Map.Entry<Integer, Integer> entry : other.valueCount.entries()) { + Integer value = entry.getKey(); + Integer count = entry.getValue(); + + Integer curCount = acc.valueCount.get(value); + curCount = curCount == null ? count : curCount + count; + + if (curCount == 0) { + acc.valueCount.remove(value); + } else { + acc.valueCount.put(value, curCount); + } + } + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrAggFunction.java new file mode 100644 index 00000000000..19bdd5db219 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrAggFunction.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Built-in BITMAP_OR_AGG aggregate function. */ +@Internal +public final class BitmapOrAggFunction + extends BuiltInAggregateFunction<Bitmap, BitmapOrAggFunction.BitmapOrAccumulator> { + + private final transient DataType valueDataType; + + public BitmapOrAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List<DataType> getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapOrAccumulator.class, DataTypes.FIELD("bitmap", DataTypes.BITMAP())); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_OR_AGG. */ + public static class BitmapOrAccumulator { + + public @Nullable RoaringBitmapData bitmap; + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapOrAggFunction.BitmapOrAccumulator that = + (BitmapOrAggFunction.BitmapOrAccumulator) obj; + return Objects.equals(bitmap, that.bitmap); + } + + @Override + public int hashCode() { + return Objects.hash(bitmap); + } + } + + @Override + public BitmapOrAccumulator createAccumulator() { + return new BitmapOrAccumulator(); + } + + public void resetAccumulator(BitmapOrAccumulator acc) { + acc.bitmap = null; + } + + @Override + public Bitmap getValue(BitmapOrAccumulator acc) { + return Bitmap.from(acc.bitmap); + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapOrAccumulator acc, @Nullable Bitmap bitmap) { + if (bitmap == null) { + return; + } + + if (acc.bitmap != null) { + acc.bitmap.or(bitmap); + } else { + acc.bitmap = RoaringBitmapData.from(bitmap); + } + } + + public void merge(BitmapOrAccumulator acc, Iterable<BitmapOrAccumulator> its) { + for (BitmapOrAccumulator other : its) { + if (other.bitmap != null) { + if (acc.bitmap != null) { + acc.bitmap.or(other.bitmap); + } else { + acc.bitmap = RoaringBitmapData.from(other.bitmap); + } + } + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrWithRetractAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrWithRetractAggFunction.java new file mode 100644 index 00000000000..d4299c21511 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapOrWithRetractAggFunction.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.dataview.MapView; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Built-in BITMAP_OR_AGG with retraction aggregate function. */ +@Internal +public final class BitmapOrWithRetractAggFunction + extends BuiltInAggregateFunction< + Bitmap, BitmapOrWithRetractAggFunction.BitmapOrWithRetractAccumulator> { + + private final transient DataType valueDataType; + + public BitmapOrWithRetractAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List<DataType> getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapOrWithRetractAccumulator.class, + DataTypes.FIELD("bitmapCount", DataTypes.INT().notNull()), + DataTypes.FIELD("bitmap", DataTypes.BITMAP().notNull()), + DataTypes.FIELD( + "valueCount", + MapView.newMapViewDataType( + DataTypes.INT().notNull(), DataTypes.INT().notNull()))); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_OR_AGG with retraction. */ + public static class BitmapOrWithRetractAccumulator { + + public int bitmapCount = 0; + // bitmap should reflect the actual data based on valueCount + public RoaringBitmapData bitmap = RoaringBitmapData.empty(); + public MapView<Integer, Integer> valueCount = new MapView<>(); + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapOrWithRetractAccumulator that = (BitmapOrWithRetractAccumulator) obj; + return bitmapCount == that.bitmapCount + && Objects.equals(bitmap, that.bitmap) + && Objects.equals(valueCount, that.valueCount); + } + + @Override + public int hashCode() { + return Objects.hash(bitmapCount, bitmap, valueCount); + } + } + + @Override + public BitmapOrWithRetractAccumulator createAccumulator() { + return new BitmapOrWithRetractAccumulator(); + } + + public void resetAccumulator(BitmapOrWithRetractAccumulator acc) { + acc.bitmapCount = 0; + acc.bitmap.clear(); + acc.valueCount.clear(); + } + + @Override + public Bitmap getValue(BitmapOrWithRetractAccumulator acc) { + return acc.bitmapCount <= 0 ? null : RoaringBitmapData.from(acc.bitmap); + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapOrWithRetractAccumulator acc, @Nullable Bitmap bitmap) + throws Exception { + if (bitmap == null) { + return; + } + + acc.bitmapCount++; + + RoaringBitmapData.toRoaringBitmapData(bitmap) + .forEach( + value -> { + try { + Integer count = acc.valueCount.get(value); + count = count == null ? 1 : count + 1; + + if (count == 0) { + acc.valueCount.remove(value); + } else { + acc.valueCount.put(value, count); + // add value to bitmap if count changes from 0 to 1 or expires + if (count == 1) { + acc.bitmap.add(value); + } + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + }); + } + + public void retract(BitmapOrWithRetractAccumulator acc, @Nullable Bitmap bitmap) + throws Exception { + if (bitmap == null) { + return; + } + + acc.bitmapCount--; + + RoaringBitmapData.toRoaringBitmapData(bitmap) + .forEach( + value -> { + try { + Integer count = acc.valueCount.get(value); + count = count == null ? -1 : count - 1; + + if (count == 0) { + acc.valueCount.remove(value); + // remove value from bitmap if count changes from 1 to 0 + acc.bitmap.remove(value); + } else { + acc.valueCount.put(value, count); + if (count == -1) { + // remove value from bitmap if count expires + acc.bitmap.remove(value); + } + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + }); + } + + public void merge( + BitmapOrWithRetractAccumulator acc, Iterable<BitmapOrWithRetractAccumulator> its) + throws Exception { + for (BitmapOrWithRetractAccumulator other : its) { + acc.bitmapCount += other.bitmapCount; + for (Map.Entry<Integer, Integer> entry : other.valueCount.entries()) { + Integer value = entry.getKey(); + // count != 0 + Integer count = entry.getValue(); + + Integer curCount = acc.valueCount.get(value); + curCount = curCount == null ? count : curCount + count; + + if (curCount == 0) { + acc.valueCount.remove(value); + + if (curCount > count) { + // preCount > 0, value is in bitmap + acc.bitmap.remove(value); + } + } else { + acc.valueCount.put(value, curCount); + + if (0 < curCount && curCount <= count) { + // preCount < 0, value is not in bitmap + // preCount = 0, unknown (expiration) + acc.bitmap.add(value); + } + + if (0 > curCount && curCount >= count) { + // preCount > 0, value is in bitmap + // preCount = 0, unknown (expiration) + acc.bitmap.remove(value); + } + } + } + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorAggFunction.java new file mode 100644 index 00000000000..766362f424d --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorAggFunction.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Built-in BITMAP_XOR_AGG aggregate function. */ +@Internal +public final class BitmapXorAggFunction + extends BuiltInAggregateFunction<Bitmap, BitmapXorAggFunction.BitmapXorAccumulator> { + + private final transient DataType valueDataType; + + public BitmapXorAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List<DataType> getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapXorAccumulator.class, DataTypes.FIELD("bitmap", DataTypes.BITMAP())); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_XOR_AGG. */ + public static class BitmapXorAccumulator { + + public @Nullable RoaringBitmapData bitmap; + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapXorAggFunction.BitmapXorAccumulator that = + (BitmapXorAggFunction.BitmapXorAccumulator) obj; + return Objects.equals(bitmap, that.bitmap); + } + + @Override + public int hashCode() { + return Objects.hash(bitmap); + } + } + + @Override + public BitmapXorAccumulator createAccumulator() { + return new BitmapXorAccumulator(); + } + + public void resetAccumulator(BitmapXorAccumulator acc) { + acc.bitmap = null; + } + + @Override + public Bitmap getValue(BitmapXorAccumulator acc) { + return Bitmap.from(acc.bitmap); + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapXorAccumulator acc, @Nullable Bitmap bitmap) { + if (bitmap == null) { + return; + } + + if (acc.bitmap != null) { + acc.bitmap.xor(bitmap); + } else { + acc.bitmap = RoaringBitmapData.from(bitmap); + } + } + + public void merge(BitmapXorAccumulator acc, Iterable<BitmapXorAccumulator> its) { + for (BitmapXorAccumulator other : its) { + if (other.bitmap != null) { + if (acc.bitmap != null) { + acc.bitmap.xor(other.bitmap); + } else { + acc.bitmap = RoaringBitmapData.from(other.bitmap); + } + } + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorWithRetractAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorWithRetractAggFunction.java new file mode 100644 index 00000000000..d30e52beebb --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapXorWithRetractAggFunction.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Built-in BITMAP_XOR_AGG with retraction aggregate function. */ +@Internal +public final class BitmapXorWithRetractAggFunction + extends BuiltInAggregateFunction< + Bitmap, BitmapXorWithRetractAggFunction.BitmapXorWithRetractAccumulator> { + + private final transient DataType valueDataType; + + public BitmapXorWithRetractAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List<DataType> getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapXorWithRetractAccumulator.class, + DataTypes.FIELD("bitmapCount", DataTypes.INT().notNull()), + DataTypes.FIELD("bitmap", DataTypes.BITMAP().notNull())); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_XOR_AGG with retraction. */ + public static class BitmapXorWithRetractAccumulator { + + public int bitmapCount = 0; + public RoaringBitmapData bitmap = RoaringBitmapData.empty(); + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapXorWithRetractAccumulator that = (BitmapXorWithRetractAccumulator) obj; + return bitmapCount == that.bitmapCount && Objects.equals(bitmap, that.bitmap); + } + + @Override + public int hashCode() { + return Objects.hash(bitmapCount, bitmap); + } + } + + @Override + public BitmapXorWithRetractAccumulator createAccumulator() { + return new BitmapXorWithRetractAccumulator(); + } + + public void resetAccumulator(BitmapXorWithRetractAccumulator acc) { + acc.bitmapCount = 0; + acc.bitmap.clear(); + } + + @Override + public Bitmap getValue(BitmapXorWithRetractAccumulator acc) { + return acc.bitmapCount <= 0 ? null : RoaringBitmapData.from(acc.bitmap); + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapXorWithRetractAccumulator acc, @Nullable Bitmap bitmap) { + if (bitmap == null) { + return; + } + + acc.bitmapCount++; + acc.bitmap.xor(bitmap); + } + + public void retract(BitmapXorWithRetractAccumulator acc, @Nullable Bitmap bitmap) { + if (bitmap == null) { + return; + } + + acc.bitmapCount--; + acc.bitmap.xor(bitmap); + } + + public void merge( + BitmapXorWithRetractAccumulator acc, Iterable<BitmapXorWithRetractAccumulator> its) { + for (BitmapXorWithRetractAccumulator other : its) { + acc.bitmapCount += other.bitmapCount; + acc.bitmap.xor(other.bitmap); + } + } +}
