This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0071c4d6e8060d94aee6beddc9e3fba190825b31 Author: dylanhz <[email protected]> AuthorDate: Tue Mar 24 01:33:06 2026 +0800 [FLINK-39186][table] Add the built-in function BITMAP_BUILD --- .../docs/sql/functions/built-in-functions.md | 4 + .../docs/sql/functions/built-in-functions.md | 4 + docs/data/sql_functions.yml | 10 +++ docs/data/sql_functions_zh.yml | 10 +++ .../docs/reference/pyflink.table/expressions.rst | 10 +++ flink-python/pyflink/table/expression.py | 12 +++ .../pyflink/table/tests/test_expression.py | 3 + .../flink/table/api/internal/BaseExpressions.java | 14 +++ .../functions/BuiltInFunctionDefinitions.java | 26 ++++++ .../planner/functions/BitmapFunctionsITCase.java | 100 +++++++++++++++++++++ .../functions/scalar/BitmapBuildFunction.java | 53 +++++++++++ 11 files changed, 246 insertions(+) diff --git a/docs/content.zh/docs/sql/functions/built-in-functions.md b/docs/content.zh/docs/sql/functions/built-in-functions.md index 0f1db4863dc..70fa6bc5e84 100644 --- a/docs/content.zh/docs/sql/functions/built-in-functions.md +++ b/docs/content.zh/docs/sql/functions/built-in-functions.md @@ -107,6 +107,10 @@ JSON 函数使用符合 ISO/IEC TR 19075-6 SQL标准的 JSON 路径表达式。 {{< sql_functions_zh "hashfunctions" >}} +### 位图函数 + +{{< sql_functions_zh "bitmap" >}} + ### 辅助函数 {{< sql_functions_zh "auxiliary" >}} diff --git a/docs/content/docs/sql/functions/built-in-functions.md b/docs/content/docs/sql/functions/built-in-functions.md index 53a2fdcc58e..f85ae93e486 100644 --- a/docs/content/docs/sql/functions/built-in-functions.md +++ b/docs/content/docs/sql/functions/built-in-functions.md @@ -110,6 +110,10 @@ Known Limitations: {{< sql_functions "hashfunctions" >}} +### Bitmap Functions + +{{< sql_functions "bitmap" >}} + ### Auxiliary Functions {{< sql_functions "auxiliary" >}} diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 2091a451aff..66f70afc0ec 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -1349,6 +1349,16 @@ hashfunctions: table: STRING.sha2(INT) description: Returns the hash using the SHA-2 family of hash functions (SHA-224, SHA-256, SHA-384, or SHA-512). The first argument string is the string to be hashed and the second argument hashLength is the bit length of the result (224, 256, 384, or 512). Returns NULL if string or hashLength is NULL. +bitmap: + - sql: BITMAP_BUILD(array) + table: array.bitmapBuild() + description: | + Creates a bitmap from an array of 32-bit integers. + + `array ARRAY<INT>` + + Returns a `BITMAP`. `NULL` if the argument is `NULL`. + auxilary: - table: callSql(STRING) description: | diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index afde8237a99..7051643ec9b 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -1435,6 +1435,16 @@ hashfunctions: 使用 SHA-2 系列散列函数(SHA-224,SHA-256,SHA-384 或 SHA-512)返回散列值。第一个参数 string 是要散列的字符串, 第二个参数 hashLength 是结果的位数(224,256,384 或 512)。如果 string 或 hashLength 为 `NULL`,则返回 `NULL`。 +bitmap: + - sql: BITMAP_BUILD(array) + table: array.bitmapBuild() + description: | + 从 32 位整数数组创建位图。 + + `array ARRAY<INT>` + + 返回一个 `BITMAP`。如果参数为 `NULL`,则返回 `NULL`。 + auxilary: - table: callSql(STRING) description: | diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index c2ab212c561..29ca42055c5 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -336,3 +336,13 @@ value modification functions :toctree: api/ Expression.object_update + +Bitmap functions +---------------- + +.. currentmodule:: pyflink.table.expression + +.. autosummary:: + :toctree: api/ + + Expression.bitmap_build diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 7b1e16c0a60..650feedd303 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -2275,6 +2275,18 @@ class Expression(Generic[T]): """ return _varargs_op("objectUpdate")(self, *kv) + # ---------------------------- Bitmap functions ----------------------------- + + def bitmap_build(self) -> 'Expression': + """ + Creates a bitmap from an array of 32-bit integers. + + If the input is null, the result is null. + + :return: a BITMAP expression + """ + return _unary_op("bitmapBuild")(self) + # add the docs _make_math_log_doc() diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index 536ee1a08e9..467c2dd6f6e 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -265,6 +265,9 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): JsonQueryOnEmptyOrError.NULL, JsonQueryOnEmptyOrError.EMPTY_ARRAY))) + # bitmap functions + self.assertEqual("BITMAP_BUILD(a)", str(expr1.bitmap_build())) + def test_expressions(self): expr1 = col('a') expr2 = col('b') diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 21930bed89b..d4c24a86c49 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -76,6 +76,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ATAN; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AVG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BETWEEN; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BTRIM; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CARDINALITY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST; @@ -2601,4 +2602,17 @@ public abstract class BaseExpressions<InType, OutType> { return toApiSpecificExpression( ApiExpressionUtils.unresolvedCall(OBJECT_UPDATE, expressions)); } + + // Bitmap functions + + /** + * Creates a bitmap from an array of 32-bit integers. + * + * <p>If the input is null, the result is null. + * + * @return a BITMAP expression + */ + public OutType bitmapBuild() { + return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD, toExpr())); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 1499b1c0a60..244635257ba 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -2981,6 +2981,32 @@ public final class BuiltInFunctionDefinitions { "org.apache.flink.table.runtime.functions.scalar.TryParseJsonFunction") .build(); + // -------------------------------------------------------------------------------------------- + // Bitmap functions + // -------------------------------------------------------------------------------------------- + + public static final BuiltInFunctionDefinition BITMAP_BUILD = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_BUILD") + .kind(SCALAR) + .inputTypeStrategy( + or( + sequence( + Collections.singletonList("array"), + Collections.singletonList( + InputTypeStrategies.explicit( + DataTypes.ARRAY(DataTypes.INT()) + .notNull()))), + sequence( + Collections.singletonList("array"), + Collections.singletonList( + InputTypeStrategies.explicit( + DataTypes.ARRAY(DataTypes.INT())))))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BITMAP()))) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.BitmapBuildFunction") + .build(); + // -------------------------------------------------------------------------------------------- // Other functions // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java new file mode 100644 index 00000000000..e737523aecb --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.types.bitmap.Bitmap; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +import static org.apache.flink.table.api.Expressions.$; + +/** Test bitmap functions correct behaviour. */ +class BitmapFunctionsITCase extends BuiltInFunctionTestBase { + + @Override + Stream<TestSetSpec> getTestSetSpecs() { + final List<TestSetSpec> specs = new ArrayList<>(); + specs.addAll(bitmapBuildTestCases()); + return specs.stream(); + } + + private List<TestSetSpec> bitmapBuildTestCases() { + return Arrays.asList( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD) + .onFieldsWithData( + null, + new Integer[] {1, null, 1}, + new Integer[] {-1}, + new Integer[] {1, 2, 3, -4}) + .andDataTypes( + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.INT()).notNull()) + // null array + .testResult( + $("f0").bitmapBuild(), "BITMAP_BUILD(f0)", null, DataTypes.BITMAP()) + // null array element + .testResult( + $("f1").arrayRemove(1).bitmapBuild(), + "BITMAP_BUILD(ARRAY_REMOVE(f1, 1))", + Bitmap.empty(), + DataTypes.BITMAP()) + .testResult( + $("f1").bitmapBuild(), + "BITMAP_BUILD(f1)", + Bitmap.fromArray(new int[] {1}), + DataTypes.BITMAP()) + // empty array + .testResult( + $("f2").arrayRemove(-1).bitmapBuild(), + "BITMAP_BUILD(ARRAY_REMOVE(f2, -1))", + Bitmap.empty(), + DataTypes.BITMAP()) + // normal cases + .testResult( + $("f2").bitmapBuild(), + "BITMAP_BUILD(f2)", + Bitmap.fromArray(new int[] {-1}), + DataTypes.BITMAP()) + .testResult( + $("f3").bitmapBuild(), + "BITMAP_BUILD(f3)", + Bitmap.fromArray(new int[] {1, 2, 3, -4}), + DataTypes.BITMAP().notNull()), + TestSetSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD, "Validation Error") + .onFieldsWithData(1024, new long[] {1L, 2L}) + .andDataTypes(DataTypes.INT(), DataTypes.ARRAY(DataTypes.BIGINT())) + .testTableApiValidationError( + $("f0").bitmapBuild(), + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_BUILD(array ARRAY<INT> NOT NULL)\n" + + "BITMAP_BUILD(array ARRAY<INT>)") + .testSqlValidationError( + "BITMAP_BUILD(f1)", + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_BUILD(array ARRAY<INT> NOT NULL)\n" + + "BITMAP_BUILD(array ARRAY<INT>)")); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapBuildFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapBuildFunction.java new file mode 100644 index 00000000000..f2f18e7c3f4 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapBuildFunction.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.types.bitmap.Bitmap; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#BITMAP_BUILD}. */ +@Internal +public class BitmapBuildFunction extends BuiltInScalarFunction { + + public BitmapBuildFunction(SpecializedContext context) { + super(BuiltInFunctionDefinitions.BITMAP_BUILD, context); + } + + public Bitmap eval(@Nullable ArrayData array) { + if (array == null) { + return null; + } + + int count = 0; + int[] values = new int[array.size()]; + for (int i = 0; i < array.size(); i++) { + if (!array.isNullAt(i)) { + values[count++] = array.getInt(i); + } + } + Bitmap bitmap = Bitmap.empty(); + bitmap.addN(values, 0, count); + return bitmap; + } +}
