This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit afe599dc95b7f7425fc29e72391d4e3d8ac5701e
Author: dylanhz <[email protected]>
AuthorDate: Tue Mar 24 01:34:02 2026 +0800

    [FLINK-39186][table] Add the built-in function BITMAP_FROM_BYTES, 
BITMAP_TO_BYTES
---
 docs/data/sql_functions.yml                        |  20 ++++
 docs/data/sql_functions_zh.yml                     |  20 ++++
 .../docs/reference/pyflink.table/expressions.rst   |   2 +
 flink-python/pyflink/table/expression.py           |  26 +++++
 .../pyflink/table/tests/test_expression.py         |   2 +
 .../flink/table/api/internal/BaseExpressions.java  |  32 ++++++
 .../functions/BuiltInFunctionDefinitions.java      |  27 +++++
 .../planner/functions/BitmapFunctionsITCase.java   | 127 +++++++++++++++++++++
 .../functions/scalar/BitmapFromBytesFunction.java  |  49 ++++++++
 .../functions/scalar/BitmapToBytesFunction.java    |  42 +++++++
 10 files changed, 347 insertions(+)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 66f70afc0ec..56d5491b77e 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -1358,6 +1358,26 @@ bitmap:
       `array ARRAY<INT>`
       
       Returns a `BITMAP`. `NULL` if the argument is `NULL`.
+  - sql: BITMAP_FROM_BYTES(bytes)
+    table: bytes.bitmapFromBytes()
+    description: |
+      Converts an array of bytes to a bitmap.
+      
+      Following the format defined in [32-bit RoaringBitmap format 
specification](https://github.com/RoaringBitmap/RoaringFormatSpec).
+            
+      `bytes <BINARY | VARBINARY>`
+      
+      Returns a `BITMAP`. `NULL` if the argument is `NULL`.
+  - sql: BITMAP_TO_BYTES(bitmap)
+    table: bitmap.bitmapToBytes()
+    description: |
+      Converts a bitmap to an array of bytes.
+      
+      Following the format defined in [32-bit RoaringBitmap format 
specification](https://github.com/RoaringBitmap/RoaringFormatSpec).
+      
+      `bitmap BITMAP`
+      
+      Returns a `VARBINARY`. `NULL` if the argument is `NULL`.
 
 auxilary:
   - table: callSql(STRING)
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 7051643ec9b..1b56accd1d0 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -1444,6 +1444,26 @@ bitmap:
       `array ARRAY<INT>`
       
       返回一个 `BITMAP`。如果参数为 `NULL`,则返回 `NULL`。
+  - sql: BITMAP_FROM_BYTES(bytes)
+    table: bytes.bitmapFromBytes()
+    description: |
+      将字节数组转换为位图。
+      
+      遵循 [32-bit RoaringBitmap format 
specification](https://github.com/RoaringBitmap/RoaringFormatSpec) 定义的格式。
+            
+      `bytes <BINARY | VARBINARY>`
+      
+      返回一个 `BITMAP`。如果参数为 `NULL`,则返回 `NULL`。
+  - sql: BITMAP_TO_BYTES(bitmap)
+    table: bitmap.bitmapToBytes()
+    description: |
+      将位图转换为字节数组。
+      
+      遵循 [32-bit RoaringBitmap format 
specification](https://github.com/RoaringBitmap/RoaringFormatSpec) 定义的格式。
+            
+      `bitmap BITMAP`
+      
+      返回一个 `VARBINARY`。如果参数为 `NULL`,则返回 `NULL`。
 
 auxilary:
   - table: callSql(STRING)
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 29ca42055c5..61bf4a8eef3 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -346,3 +346,5 @@ Bitmap functions
     :toctree: api/
 
     Expression.bitmap_build
+    Expression.bitmap_from_bytes
+    Expression.bitmap_to_bytes
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 650feedd303..8a26a887b45 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -2287,6 +2287,32 @@ class Expression(Generic[T]):
         """
         return _unary_op("bitmapBuild")(self)
 
+    def bitmap_from_bytes(self) -> 'Expression':
+        """
+        Converts an array of bytes to a bitmap.
+
+        Following the format defined in `32-bit RoaringBitmap format 
specification \
+        <https://github.com/RoaringBitmap/RoaringFormatSpec>`_.
+
+        If the input is null, the result is null.
+
+        :return: a BITMAP expression
+        """
+        return _unary_op("bitmapFromBytes")(self)
+
+    def bitmap_to_bytes(self) -> 'Expression':
+        """
+        Converts a bitmap to an array of bytes.
+
+        Following the format defined in `32-bit RoaringBitmap format 
specification \
+        <https://github.com/RoaringBitmap/RoaringFormatSpec>`_.
+
+        If the input is null, the result is null.
+
+        :return: a VARBINARY expression
+        """
+        return _unary_op("bitmapToBytes")(self)
+
 
 # add the docs
 _make_math_log_doc()
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index 467c2dd6f6e..c6385bd488f 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -267,6 +267,8 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
 
         # bitmap functions
         self.assertEqual("BITMAP_BUILD(a)", str(expr1.bitmap_build()))
+        self.assertEqual("BITMAP_FROM_BYTES(a)", 
str(expr1.bitmap_from_bytes()))
+        self.assertEqual("BITMAP_TO_BYTES(a)", str(expr1.bitmap_to_bytes()))
 
     def test_expressions(self):
         expr1 = col('a')
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index d4c24a86c49..c742b7ca958 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -77,6 +77,8 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.AVG;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BETWEEN;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_FROM_BYTES;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_BYTES;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BTRIM;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.CARDINALITY;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
@@ -2615,4 +2617,34 @@ public abstract class BaseExpressions<InType, OutType> {
     public OutType bitmapBuild() {
         return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD, toExpr()));
     }
+
+    /**
+     * Converts an array of bytes to a bitmap.
+     *
+     * <p>Following the format defined in <a
+     * href="https://github.com/RoaringBitmap/RoaringFormatSpec";>32-bit 
RoaringBitmap format
+     * specification</a>.
+     *
+     * <p>If the input is null, the result is null.
+     *
+     * @return a BITMAP expression
+     */
+    public OutType bitmapFromBytes() {
+        return toApiSpecificExpression(unresolvedCall(BITMAP_FROM_BYTES, 
toExpr()));
+    }
+
+    /**
+     * Converts a bitmap to an array of bytes.
+     *
+     * <p>Following the format defined in <a
+     * href="https://github.com/RoaringBitmap/RoaringFormatSpec";>32-bit 
RoaringBitmap format
+     * specification</a>.
+     *
+     * <p>If the input is null, the result is null.
+     *
+     * @return a VARBINARY expression
+     */
+    public OutType bitmapToBytes() {
+        return toApiSpecificExpression(unresolvedCall(BITMAP_TO_BYTES, 
toExpr()));
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 244635257ba..ddbe1887fce 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -3007,6 +3007,33 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.BitmapBuildFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition BITMAP_FROM_BYTES =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("BITMAP_FROM_BYTES")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Collections.singletonList("bytes"),
+                                    Collections.singletonList(
+                                            
logical(LogicalTypeFamily.BINARY_STRING))))
+                    
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BITMAP())))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.BitmapFromBytesFunction")
+                    .build();
+
+    public static final BuiltInFunctionDefinition BITMAP_TO_BYTES =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("BITMAP_TO_BYTES")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Collections.singletonList("bitmap"),
+                                    
Collections.singletonList(logical(LogicalTypeRoot.BITMAP))))
+                    
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BYTES())))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.BitmapToBytesFunction")
+                    .build();
+
     // 
--------------------------------------------------------------------------------------------
     // Other functions
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java
index e737523aecb..121675f2318 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapFunctionsITCase.java
@@ -19,9 +19,13 @@
 package org.apache.flink.table.planner.functions;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableRuntimeException;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.types.bitmap.Bitmap;
 
+import org.roaringbitmap.RoaringBitmap;
+
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -36,6 +40,8 @@ class BitmapFunctionsITCase extends BuiltInFunctionTestBase {
     Stream<TestSetSpec> getTestSetSpecs() {
         final List<TestSetSpec> specs = new ArrayList<>();
         specs.addAll(bitmapBuildTestCases());
+        specs.addAll(bitmapFromBytesTestCases());
+        specs.addAll(bitmapToBytesTestCases());
         return specs.stream();
     }
 
@@ -97,4 +103,125 @@ class BitmapFunctionsITCase extends 
BuiltInFunctionTestBase {
                                         + "BITMAP_BUILD(array ARRAY<INT> NOT 
NULL)\n"
                                         + "BITMAP_BUILD(array ARRAY<INT>)"));
     }
+
+    private List<TestSetSpec> bitmapFromBytesTestCases() {
+        return Arrays.asList(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_FROM_BYTES)
+                        .onFieldsWithData(
+                                null,
+                                toSerializedBytes(),
+                                toSerializedBytes(-1),
+                                toSerializedBytes(1, 2, 3, -4))
+                        .andDataTypes(
+                                DataTypes.BYTES(),
+                                DataTypes.BYTES(),
+                                DataTypes.BYTES(),
+                                DataTypes.BYTES().notNull())
+                        // null
+                        .testResult(
+                                $("f0").bitmapFromBytes(),
+                                "BITMAP_FROM_BYTES(f0)",
+                                null,
+                                DataTypes.BITMAP())
+                        // empty
+                        .testResult(
+                                $("f1").bitmapFromBytes(),
+                                "BITMAP_FROM_BYTES(f1)",
+                                Bitmap.empty(),
+                                DataTypes.BITMAP())
+                        // normal cases
+                        .testResult(
+                                $("f2").bitmapFromBytes(),
+                                "BITMAP_FROM_BYTES(f2)",
+                                Bitmap.fromArray(new int[] {-1}),
+                                DataTypes.BITMAP())
+                        .testResult(
+                                $("f3").bitmapFromBytes(),
+                                "BITMAP_FROM_BYTES(f3)",
+                                Bitmap.fromArray(new int[] {1, 2, 3, -4}),
+                                DataTypes.BITMAP().notNull()),
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.BITMAP_FROM_BYTES, 
"Runtime Error")
+                        .onFieldsWithData("".getBytes(), "invalid".getBytes())
+                        .andDataTypes(DataTypes.BYTES(), DataTypes.BYTES())
+                        .testTableApiRuntimeError(
+                                $("f0").bitmapFromBytes(),
+                                TableRuntimeException.class,
+                                "Failed to deserialize bitmap from bytes.")
+                        .testSqlRuntimeError(
+                                "BITMAP_FROM_BYTES(f1)",
+                                TableRuntimeException.class,
+                                "Failed to deserialize bitmap from bytes."),
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.BITMAP_FROM_BYTES, 
"Validation Error")
+                        .onFieldsWithData("{1,2}", new int[] {1, 2})
+                        .andDataTypes(DataTypes.STRING(), 
DataTypes.ARRAY(DataTypes.INT()))
+                        .testTableApiValidationError(
+                                $("f0").bitmapFromBytes(),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "BITMAP_FROM_BYTES(bytes 
<BINARY_STRING>)")
+                        .testSqlValidationError(
+                                "BITMAP_FROM_BYTES(f1)",
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "BITMAP_FROM_BYTES(bytes 
<BINARY_STRING>)"));
+    }
+
+    private List<TestSetSpec> bitmapToBytesTestCases() {
+        return Arrays.asList(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_TO_BYTES)
+                        .onFieldsWithData(
+                                null,
+                                new Integer[] {-1},
+                                new Integer[] {Integer.MIN_VALUE, -1, 1, 2, 3})
+                        .andDataTypes(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.INT()).notNull())
+                        // null
+                        .testResult(
+                                $("f0").bitmapBuild().bitmapToBytes(),
+                                "BITMAP_TO_BYTES(BITMAP_BUILD(f0))",
+                                null,
+                                DataTypes.BYTES())
+                        // empty
+                        .testResult(
+                                
$("f1").arrayRemove(-1).bitmapBuild().bitmapToBytes(),
+                                "BITMAP_TO_BYTES(BITMAP_BUILD(ARRAY_REMOVE(f1, 
-1)))",
+                                toSerializedBytes(),
+                                DataTypes.BYTES())
+                        // normal cases
+                        .testResult(
+                                $("f1").bitmapBuild().bitmapToBytes(),
+                                "BITMAP_TO_BYTES(BITMAP_BUILD(f1))",
+                                toSerializedBytes(-1),
+                                DataTypes.BYTES())
+                        .testResult(
+                                $("f2").bitmapBuild().bitmapToBytes(),
+                                "BITMAP_TO_BYTES(BITMAP_BUILD(f2))",
+                                toSerializedBytes(1, 2, 3, Integer.MIN_VALUE, 
-1),
+                                DataTypes.BYTES().notNull()),
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.BITMAP_TO_BYTES, 
"Validation Error")
+                        .onFieldsWithData(1024, new int[] {1, 2})
+                        .andDataTypes(DataTypes.INT(), 
DataTypes.ARRAY(DataTypes.INT()))
+                        .testTableApiValidationError(
+                                $("f0").bitmapToBytes(),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "BITMAP_TO_BYTES(bitmap <BITMAP>)")
+                        .testSqlValidationError(
+                                "BITMAP_TO_BYTES(f1)",
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "BITMAP_TO_BYTES(bitmap <BITMAP>)"));
+    }
+
+    // ~ Utils 
--------------------------------------------------------------------
+
+    private byte[] toSerializedBytes(int... values) {
+        assert values != null;
+        RoaringBitmap rb = RoaringBitmap.bitmapOf(values);
+        rb.runOptimize();
+        ByteBuffer buffer = ByteBuffer.allocate(rb.serializedSizeInBytes());
+        rb.serialize(buffer);
+        return buffer.array();
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapFromBytesFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapFromBytesFunction.java
new file mode 100644
index 00000000000..6b707b9861d
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapFromBytesFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.types.bitmap.Bitmap;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#BITMAP_FROM_BYTES}. */
+@Internal
+public class BitmapFromBytesFunction extends BuiltInScalarFunction {
+
+    public BitmapFromBytesFunction(SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.BITMAP_FROM_BYTES, context);
+    }
+
+    public Bitmap eval(@Nullable byte[] bytes) {
+        if (bytes == null) {
+            return null;
+        }
+
+        try {
+            return Bitmap.fromBytes(bytes);
+        } catch (DeserializationException e) {
+            throw new TableRuntimeException("Failed to deserialize bitmap from 
bytes.", e);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapToBytesFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapToBytesFunction.java
new file mode 100644
index 00000000000..31ffb8bb26a
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BitmapToBytesFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.types.bitmap.Bitmap;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#BITMAP_TO_BYTES}. */
+@Internal
+public class BitmapToBytesFunction extends BuiltInScalarFunction {
+
+    public BitmapToBytesFunction(SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.BITMAP_TO_BYTES, context);
+    }
+
+    public byte[] eval(@Nullable Bitmap bitmap) {
+        if (bitmap == null) {
+            return null;
+        }
+        return bitmap.toBytes();
+    }
+}

Reply via email to