This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit afe599dc95b7f7425fc29e72391d4e3d8ac5701e Author: dylanhz <[email protected]> AuthorDate: Tue Mar 24 01:34:02 2026 +0800 [FLINK-39186][table] Add the built-in function BITMAP_FROM_BYTES, BITMAP_TO_BYTES --- docs/data/sql_functions.yml | 20 ++++ docs/data/sql_functions_zh.yml | 20 ++++ .../docs/reference/pyflink.table/expressions.rst | 2 + flink-python/pyflink/table/expression.py | 26 +++++ .../pyflink/table/tests/test_expression.py | 2 + .../flink/table/api/internal/BaseExpressions.java | 32 ++++++ .../functions/BuiltInFunctionDefinitions.java | 27 +++++ .../planner/functions/BitmapFunctionsITCase.java | 127 +++++++++++++++++++++ .../functions/scalar/BitmapFromBytesFunction.java | 49 ++++++++ .../functions/scalar/BitmapToBytesFunction.java | 42 +++++++ 10 files changed, 347 insertions(+) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 66f70afc0ec..56d5491b77e 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -1358,6 +1358,26 @@ bitmap: `array ARRAY<INT>` Returns a `BITMAP`. `NULL` if the argument is `NULL`. + - sql: BITMAP_FROM_BYTES(bytes) + table: bytes.bitmapFromBytes() + description: | + Converts an array of bytes to a bitmap. + + Following the format defined in [32-bit RoaringBitmap format specification](https://github.com/RoaringBitmap/RoaringFormatSpec). + + `bytes <BINARY | VARBINARY>` + + Returns a `BITMAP`. `NULL` if the argument is `NULL`. + - sql: BITMAP_TO_BYTES(bitmap) + table: bitmap.bitmapToBytes() + description: | + Converts a bitmap to an array of bytes. + + Following the format defined in [32-bit RoaringBitmap format specification](https://github.com/RoaringBitmap/RoaringFormatSpec). + + `bitmap BITMAP` + + Returns a `VARBINARY`. `NULL` if the argument is `NULL`. auxilary: - table: callSql(STRING) diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index 7051643ec9b..1b56accd1d0 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -1444,6 +1444,26 @@ bitmap: `array ARRAY<INT>` 返回一个 `BITMAP`。如果参数为 `NULL`,则返回 `NULL`。 + - sql: BITMAP_FROM_BYTES(bytes) + table: bytes.bitmapFromBytes() + description: | + 将字节数组转换为位图。 + + 遵循 [32-bit RoaringBitmap format specification](https://github.com/RoaringBitmap/RoaringFormatSpec) 定义的格式。 + + `bytes <BINARY | VARBINARY>` + + 返回一个 `BITMAP`。如果参数为 `NULL`,则返回 `NULL`。 + - sql: BITMAP_TO_BYTES(bitmap) + table: bitmap.bitmapToBytes() + description: | + 将位图转换为字节数组。 + + 遵循 [32-bit RoaringBitmap format specification](https://github.com/RoaringBitmap/RoaringFormatSpec) 定义的格式。 + + `bitmap BITMAP` + + 返回一个 `VARBINARY`。如果参数为 `NULL`,则返回 `NULL`。 auxilary: - table: callSql(STRING) diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 29ca42055c5..61bf4a8eef3 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -346,3 +346,5 @@ Bitmap functions :toctree: api/ Expression.bitmap_build + Expression.bitmap_from_bytes + Expression.bitmap_to_bytes diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 650feedd303..8a26a887b45 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -2287,6 +2287,32 @@ class Expression(Generic[T]): """ return _unary_op("bitmapBuild")(self) + def bitmap_from_bytes(self) -> 'Expression': + """ + Converts an array of bytes to a bitmap. + + Following the format defined in `32-bit RoaringBitmap format specification \ + <https://github.com/RoaringBitmap/RoaringFormatSpec>`_. + + If the input is null, the result is null. + + :return: a BITMAP expression + """ + return _unary_op("bitmapFromBytes")(self) + + def bitmap_to_bytes(self) -> 'Expression': + """ + Converts a bitmap to an array of bytes. + + Following the format defined in `32-bit RoaringBitmap format specification \ + <https://github.com/RoaringBitmap/RoaringFormatSpec>`_. + + If the input is null, the result is null. + + :return: a VARBINARY expression + """ + return _unary_op("bitmapToBytes")(self) + # add the docs _make_math_log_doc() diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index 467c2dd6f6e..c6385bd488f 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -267,6 +267,8 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): # bitmap functions self.assertEqual("BITMAP_BUILD(a)", str(expr1.bitmap_build())) + self.assertEqual("BITMAP_FROM_BYTES(a)", str(expr1.bitmap_from_bytes())) + self.assertEqual("BITMAP_TO_BYTES(a)", str(expr1.bitmap_to_bytes())) def test_expressions(self): expr1 = col('a') diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index d4c24a86c49..c742b7ca958 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -77,6 +77,8 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AVG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BETWEEN; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_FROM_BYTES; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_BYTES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BTRIM; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CARDINALITY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST; @@ -2615,4 +2617,34 @@ public abstract class BaseExpressions<InType, OutType> { public OutType bitmapBuild() { return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD, toExpr())); } + + /** + * Converts an array of bytes to a bitmap. + * + * <p>Following the format defined in <a + * href="https://github.com/RoaringBitmap/RoaringFormatSpec">32-bit RoaringBitmap format + * specification</a>. + * + * <p>If the input is null, the result is null. + * + * @return a BITMAP expression + */ + public OutType bitmapFromBytes() { + return toApiSpecificExpression(unresolvedCall(BITMAP_FROM_BYTES, toExpr())); + } + + /** + * Converts a bitmap to an array of bytes. + * + * <p>Following the format defined in <a + * href="https://github.com/RoaringBitmap/RoaringFormatSpec">32-bit RoaringBitmap format + * specification</a>. + * + * <p>If the input is null, the result is null. + * + * @return a VARBINARY expression + */ + public OutType bitmapToBytes() { + return toApiSpecificExpression(unresolvedCall(BITMAP_TO_BYTES, toExpr())); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 244635257ba..ddbe1887fce 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -3007,6 +3007,33 @@ public final class BuiltInFunctionDefinitions { "org.apache.flink.table.runtime.functions.scalar.BitmapBuildFunction") .build(); + public static final BuiltInFunctionDefinition BITMAP_FROM_BYTES = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_FROM_BYTES") + .kind(SCALAR) + .inputTypeStrategy( + sequence( + Collections.singletonList("bytes"), + Collections.singletonList( + logical(LogicalTypeFamily.BINARY_STRING)))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BITMAP()))) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.BitmapFromBytesFunction") + .build(); + + public static final BuiltInFunctionDefinition BITMAP_TO_BYTES = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_TO_BYTES") + .kind(SCALAR) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BYTES()))) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.BitmapToBytesFunction") + .build(); + // -------------------------------------------------------------------------------------------- // Other functions // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java index e737523aecb..121675f2318 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java @@ -19,9 +19,13 @@ package org.apache.flink.table.planner.functions; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableRuntimeException; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.types.bitmap.Bitmap; +import org.roaringbitmap.RoaringBitmap; + +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -36,6 +40,8 @@ class BitmapFunctionsITCase extends BuiltInFunctionTestBase { Stream<TestSetSpec> getTestSetSpecs() { final List<TestSetSpec> specs = new ArrayList<>(); specs.addAll(bitmapBuildTestCases()); + specs.addAll(bitmapFromBytesTestCases()); + specs.addAll(bitmapToBytesTestCases()); return specs.stream(); } @@ -97,4 +103,125 @@ class BitmapFunctionsITCase extends BuiltInFunctionTestBase { + "BITMAP_BUILD(array ARRAY<INT> NOT NULL)\n" + "BITMAP_BUILD(array ARRAY<INT>)")); } + + private List<TestSetSpec> bitmapFromBytesTestCases() { + return Arrays.asList( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_FROM_BYTES) + .onFieldsWithData( + null, + toSerializedBytes(), + toSerializedBytes(-1), + toSerializedBytes(1, 2, 3, -4)) + .andDataTypes( + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES().notNull()) + // null + .testResult( + $("f0").bitmapFromBytes(), + "BITMAP_FROM_BYTES(f0)", + null, + DataTypes.BITMAP()) + // empty + .testResult( + $("f1").bitmapFromBytes(), + "BITMAP_FROM_BYTES(f1)", + Bitmap.empty(), + DataTypes.BITMAP()) + // normal cases + .testResult( + $("f2").bitmapFromBytes(), + "BITMAP_FROM_BYTES(f2)", + Bitmap.fromArray(new int[] {-1}), + DataTypes.BITMAP()) + .testResult( + $("f3").bitmapFromBytes(), + "BITMAP_FROM_BYTES(f3)", + Bitmap.fromArray(new int[] {1, 2, 3, -4}), + DataTypes.BITMAP().notNull()), + TestSetSpec.forFunction( + BuiltInFunctionDefinitions.BITMAP_FROM_BYTES, "Runtime Error") + .onFieldsWithData("".getBytes(), "invalid".getBytes()) + .andDataTypes(DataTypes.BYTES(), DataTypes.BYTES()) + .testTableApiRuntimeError( + $("f0").bitmapFromBytes(), + TableRuntimeException.class, + "Failed to deserialize bitmap from bytes.") + .testSqlRuntimeError( + "BITMAP_FROM_BYTES(f1)", + TableRuntimeException.class, + "Failed to deserialize bitmap from bytes."), + TestSetSpec.forFunction( + BuiltInFunctionDefinitions.BITMAP_FROM_BYTES, "Validation Error") + .onFieldsWithData("{1,2}", new int[] {1, 2}) + .andDataTypes(DataTypes.STRING(), DataTypes.ARRAY(DataTypes.INT())) + .testTableApiValidationError( + $("f0").bitmapFromBytes(), + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_FROM_BYTES(bytes <BINARY_STRING>)") + .testSqlValidationError( + "BITMAP_FROM_BYTES(f1)", + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_FROM_BYTES(bytes <BINARY_STRING>)")); + } + + private List<TestSetSpec> bitmapToBytesTestCases() { + return Arrays.asList( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_TO_BYTES) + .onFieldsWithData( + null, + new Integer[] {-1}, + new Integer[] {Integer.MIN_VALUE, -1, 1, 2, 3}) + .andDataTypes( + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.INT()).notNull()) + // null + .testResult( + $("f0").bitmapBuild().bitmapToBytes(), + "BITMAP_TO_BYTES(BITMAP_BUILD(f0))", + null, + DataTypes.BYTES()) + // empty + .testResult( + $("f1").arrayRemove(-1).bitmapBuild().bitmapToBytes(), + "BITMAP_TO_BYTES(BITMAP_BUILD(ARRAY_REMOVE(f1, -1)))", + toSerializedBytes(), + DataTypes.BYTES()) + // normal cases + .testResult( + $("f1").bitmapBuild().bitmapToBytes(), + "BITMAP_TO_BYTES(BITMAP_BUILD(f1))", + toSerializedBytes(-1), + DataTypes.BYTES()) + .testResult( + $("f2").bitmapBuild().bitmapToBytes(), + "BITMAP_TO_BYTES(BITMAP_BUILD(f2))", + toSerializedBytes(1, 2, 3, Integer.MIN_VALUE, -1), + DataTypes.BYTES().notNull()), + TestSetSpec.forFunction( + BuiltInFunctionDefinitions.BITMAP_TO_BYTES, "Validation Error") + .onFieldsWithData(1024, new int[] {1, 2}) + .andDataTypes(DataTypes.INT(), DataTypes.ARRAY(DataTypes.INT())) + .testTableApiValidationError( + $("f0").bitmapToBytes(), + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_TO_BYTES(bitmap <BITMAP>)") + .testSqlValidationError( + "BITMAP_TO_BYTES(f1)", + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_TO_BYTES(bitmap <BITMAP>)")); + } + + // ~ Utils -------------------------------------------------------------------- + + private byte[] toSerializedBytes(int... values) { + assert values != null; + RoaringBitmap rb = RoaringBitmap.bitmapOf(values); + rb.runOptimize(); + ByteBuffer buffer = ByteBuffer.allocate(rb.serializedSizeInBytes()); + rb.serialize(buffer); + return buffer.array(); + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapFromBytesFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapFromBytesFunction.java new file mode 100644 index 00000000000..6b707b9861d --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapFromBytesFunction.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.types.DeserializationException; +import org.apache.flink.types.bitmap.Bitmap; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#BITMAP_FROM_BYTES}. */ +@Internal +public class BitmapFromBytesFunction extends BuiltInScalarFunction { + + public BitmapFromBytesFunction(SpecializedContext context) { + super(BuiltInFunctionDefinitions.BITMAP_FROM_BYTES, context); + } + + public Bitmap eval(@Nullable byte[] bytes) { + if (bytes == null) { + return null; + } + + try { + return Bitmap.fromBytes(bytes); + } catch (DeserializationException e) { + throw new TableRuntimeException("Failed to deserialize bitmap from bytes.", e); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapToBytesFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapToBytesFunction.java new file mode 100644 index 00000000000..31ffb8bb26a --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapToBytesFunction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.types.bitmap.Bitmap; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#BITMAP_TO_BYTES}. */ +@Internal +public class BitmapToBytesFunction extends BuiltInScalarFunction { + + public BitmapToBytesFunction(SpecializedContext context) { + super(BuiltInFunctionDefinitions.BITMAP_TO_BYTES, context); + } + + public byte[] eval(@Nullable Bitmap bitmap) { + if (bitmap == null) { + return null; + } + return bitmap.toBytes(); + } +}
