This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 20af677612fd767dfcb062b0107b25dab54e3cf3 Author: dylanhz <[email protected]> AuthorDate: Tue Mar 24 01:44:13 2026 +0800 [FLINK-39186][table] Add the built-in function BITMAP_CARDINALITY --- docs/data/sql_functions.yml | 8 +++ docs/data/sql_functions_zh.yml | 8 +++ .../docs/reference/pyflink.table/expressions.rst | 1 + flink-python/pyflink/table/expression.py | 10 ++++ .../pyflink/table/tests/test_expression.py | 1 + .../flink/table/api/internal/BaseExpressions.java | 12 ++++ .../functions/BuiltInFunctionDefinitions.java | 13 +++++ .../planner/functions/BitmapFunctionsITCase.java | 68 ++++++++++++++++++++++ .../scalar/BitmapCardinalityFunction.java | 42 +++++++++++++ 9 files changed, 163 insertions(+) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 56d5491b77e..3b5e5a48d25 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -1358,6 +1358,14 @@ bitmap: `array ARRAY<INT>` Returns a `BITMAP`. `NULL` if the argument is `NULL`. + - sql: BITMAP_CARDINALITY(bitmap) + table: bitmap.bitmapCardinality() + description: | + Returns the cardinality of a bitmap. + + `bitmap BITMAP` + + Returns a `BIGINT`. `NULL` if the argument is `NULL`. - sql: BITMAP_FROM_BYTES(bytes) table: bytes.bitmapFromBytes() description: | diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index 1b56accd1d0..9cf9602a26c 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -1444,6 +1444,14 @@ bitmap: `array ARRAY<INT>` 返回一个 `BITMAP`。如果参数为 `NULL`,则返回 `NULL`。 + - sql: BITMAP_CARDINALITY(bitmap) + table: bitmap.bitmapCardinality() + description: | + 返回位图的基数。 + + `bitmap BITMAP` + + 返回一个 `BIGINT`。如果参数为 `NULL`,则返回 `NULL`。 - sql: BITMAP_FROM_BYTES(bytes) table: bytes.bitmapFromBytes() description: | diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 61bf4a8eef3..21a70cb62e8 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -346,5 +346,6 @@ Bitmap functions :toctree: api/ Expression.bitmap_build + Expression.bitmap_cardinality Expression.bitmap_from_bytes Expression.bitmap_to_bytes diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 8a26a887b45..5d9c766f4c8 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -2287,6 +2287,16 @@ class Expression(Generic[T]): """ return _unary_op("bitmapBuild")(self) + def bitmap_cardinality(self) -> 'Expression': + """ + Returns the cardinality of a bitmap. + + If the input is null, the result is null. + + :return: a BIGINT expression + """ + return _unary_op("bitmapCardinality")(self) + def bitmap_from_bytes(self) -> 'Expression': """ Converts an array of bytes to a bitmap. diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index c6385bd488f..1db5b8af8cd 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -267,6 +267,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): # bitmap functions self.assertEqual("BITMAP_BUILD(a)", str(expr1.bitmap_build())) + self.assertEqual("BITMAP_CARDINALITY(a)", str(expr1.bitmap_cardinality())) self.assertEqual("BITMAP_FROM_BYTES(a)", str(expr1.bitmap_from_bytes())) self.assertEqual("BITMAP_TO_BYTES(a)", str(expr1.bitmap_to_bytes())) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index c742b7ca958..07f4e85837c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -77,6 +77,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AVG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BETWEEN; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_CARDINALITY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_FROM_BYTES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_BYTES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BTRIM; @@ -2618,6 +2619,17 @@ public abstract class BaseExpressions<InType, OutType> { return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD, toExpr())); } + /** + * Returns the cardinality of a bitmap. + * + * <p>If the input is null, the result is null. + * + * @return a BIGINT expression + */ + public OutType bitmapCardinality() { + return toApiSpecificExpression(unresolvedCall(BITMAP_CARDINALITY, toExpr())); + } + /** * Converts an array of bytes to a bitmap. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index ddbe1887fce..8cf4179605e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -3007,6 +3007,19 @@ public final class BuiltInFunctionDefinitions { "org.apache.flink.table.runtime.functions.scalar.BitmapBuildFunction") .build(); + public static final BuiltInFunctionDefinition BITMAP_CARDINALITY = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_CARDINALITY") + .kind(SCALAR) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BIGINT()))) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.BitmapCardinalityFunction") + .build(); + public static final BuiltInFunctionDefinition BITMAP_FROM_BYTES = BuiltInFunctionDefinition.newBuilder() .name("BITMAP_FROM_BYTES") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java index 121675f2318..4744067a8a5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java @@ -36,10 +36,20 @@ import static org.apache.flink.table.api.Expressions.$; /** Test bitmap functions correct behaviour. */ class BitmapFunctionsITCase extends BuiltInFunctionTestBase { + private static final Bitmap OVERSIZE_BITMAP; + + static { + // size 0x80000000L + OVERSIZE_BITMAP = Bitmap.empty(); + OVERSIZE_BITMAP.add(0L, Integer.MAX_VALUE); + OVERSIZE_BITMAP.add(Integer.MAX_VALUE); + } + @Override Stream<TestSetSpec> getTestSetSpecs() { final List<TestSetSpec> specs = new ArrayList<>(); specs.addAll(bitmapBuildTestCases()); + specs.addAll(bitmapCardinalityTestCases()); specs.addAll(bitmapFromBytesTestCases()); specs.addAll(bitmapToBytesTestCases()); return specs.stream(); @@ -104,6 +114,64 @@ class BitmapFunctionsITCase extends BuiltInFunctionTestBase { + "BITMAP_BUILD(array ARRAY<INT>)")); } + private List<TestSetSpec> bitmapCardinalityTestCases() { + return Arrays.asList( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_CARDINALITY) + .onFieldsWithData( + null, + Bitmap.empty(), + Bitmap.fromArray(new int[] {-1}), + Bitmap.fromArray(new int[] {1, 2, 3, -4}), + OVERSIZE_BITMAP) + .andDataTypes( + DataTypes.BITMAP(), + DataTypes.BITMAP(), + DataTypes.BITMAP(), + DataTypes.BITMAP().notNull(), + DataTypes.BITMAP().notNull()) + // null + .testResult( + $("f0").bitmapCardinality(), + "BITMAP_CARDINALITY(f0)", + null, + DataTypes.BIGINT()) + // empty + .testResult( + $("f1").bitmapCardinality(), + "BITMAP_CARDINALITY(f1)", + 0L, + DataTypes.BIGINT()) + // normal cases + .testResult( + $("f2").bitmapCardinality(), + "BITMAP_CARDINALITY(f2)", + 1L, + DataTypes.BIGINT()) + .testResult( + $("f3").bitmapCardinality(), + "BITMAP_CARDINALITY(f3)", + 4L, + DataTypes.BIGINT().notNull()) + // oversize + .testResult( + $("f4").bitmapCardinality(), + "BITMAP_CARDINALITY(f4)", + 0x80000000L, + DataTypes.BIGINT().notNull()), + TestSetSpec.forFunction( + BuiltInFunctionDefinitions.BITMAP_CARDINALITY, "Validation Error") + .onFieldsWithData(1024, new int[] {1, 2}) + .andDataTypes(DataTypes.INT(), DataTypes.ARRAY(DataTypes.INT())) + .testTableApiValidationError( + $("f0").bitmapCardinality(), + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_CARDINALITY(bitmap <BITMAP>)") + .testSqlValidationError( + "BITMAP_CARDINALITY(f1)", + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_CARDINALITY(bitmap <BITMAP>)")); + } + private List<TestSetSpec> bitmapFromBytesTestCases() { return Arrays.asList( TestSetSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_FROM_BYTES) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapCardinalityFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapCardinalityFunction.java new file mode 100644 index 00000000000..ab0328d04e8 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapCardinalityFunction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.types.bitmap.Bitmap; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#BITMAP_CARDINALITY}. */ +@Internal +public class BitmapCardinalityFunction extends BuiltInScalarFunction { + + public BitmapCardinalityFunction(SpecializedContext context) { + super(BuiltInFunctionDefinitions.BITMAP_CARDINALITY, context); + } + + public Long eval(@Nullable Bitmap bitmap) { + if (bitmap == null) { + return null; + } + return bitmap.getLongCardinality(); + } +}
