This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f4cdf98f [core] introduce rbm32/rbm64 aggregate function. (#3724)
4f4cdf98f is described below

commit 4f4cdf98fdf1314c161f8b68ca23b430a47d3283
Author: zhoulii <[email protected]>
AuthorDate: Thu Jul 11 12:57:03 2024 +0800

    [core] introduce rbm32/rbm64 aggregate function. (#3724)
---
 docs/content/primary-key-table/merge-engine.md     |   8 ++
 .../org/apache/paimon/utils/RoaringBitmap32.java   |  27 +++++
 .../org/apache/paimon/utils/RoaringBitmap64.java   |  89 +++++++++++++++
 .../compact/aggregate/FieldAggregator.java         |  14 +++
 .../compact/aggregate/FieldRoaringBitmap32Agg.java |  68 +++++++++++
 .../compact/aggregate/FieldRoaringBitmap64Agg.java |  68 +++++++++++
 .../compact/aggregate/FieldAggregatorTest.java     |  49 ++++++++
 .../apache/paimon/flink/PreAggregationITCase.java  | 127 +++++++++++++++++++++
 8 files changed, 450 insertions(+)

diff --git a/docs/content/primary-key-table/merge-engine.md 
b/docs/content/primary-key-table/merge-engine.md
index a01a26c28..6596e975f 100644
--- a/docs/content/primary-key-table/merge-engine.md
+++ b/docs/content/primary-key-table/merge-engine.md
@@ -247,6 +247,14 @@ Current supported aggregate functions and data types are:
   The first_non_null_value function selects the first non-null value in a data 
set.
   It supports all data types.
 
+* `rbm32`:
+  The rbm32 function aggregates multiple serialized 32-bit RoaringBitmap into 
a single RoaringBitmap.
+  It supports VARBINARY data type.
+
+* `rbm64`:
+  The rbm64 function aggregates multiple serialized 64-bit Roaring64Bitmap 
into a single Roaring64Bitmap.
+  It supports VARBINARY data type.
+
 * `nested_update`:
   The nested_update function collects multiple rows into one array<row> 
(so-called 'nested table'). It supports ARRAY<ROW> data types.
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index ee83d1232..618f979ea 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -18,11 +18,14 @@
 
 package org.apache.paimon.utils;
 
+import org.apache.paimon.annotation.VisibleForTesting;
+
 import org.roaringbitmap.RoaringBitmap;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
 /** A compressed bitmap for 32-bit integer. */
@@ -80,4 +83,28 @@ public class RoaringBitmap32 {
         RoaringBitmap32 that = (RoaringBitmap32) o;
         return Objects.equals(this.roaringBitmap, that.roaringBitmap);
     }
+
+    public void clear() {
+        roaringBitmap.clear();
+    }
+
+    public byte[] serialize() {
+        roaringBitmap.runOptimize();
+        ByteBuffer buffer = 
ByteBuffer.allocate(roaringBitmap.serializedSizeInBytes());
+        roaringBitmap.serialize(buffer);
+        return buffer.array();
+    }
+
+    public void deserialize(byte[] rbmBytes) throws IOException {
+        roaringBitmap.deserialize(ByteBuffer.wrap(rbmBytes));
+    }
+
+    @VisibleForTesting
+    public static RoaringBitmap32 bitmapOf(int... dat) {
+        RoaringBitmap32 roaringBitmap32 = new RoaringBitmap32();
+        for (int ele : dat) {
+            roaringBitmap32.add(ele);
+        }
+        return roaringBitmap32;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap64.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap64.java
new file mode 100644
index 000000000..da31042a7
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap64.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Objects;
+
+/** A compressed bitmap for 64-bit integer. */
+public class RoaringBitmap64 {
+
+    private final Roaring64Bitmap roaringBitmap;
+
+    public RoaringBitmap64() {
+        this.roaringBitmap = new Roaring64Bitmap();
+    }
+
+    public void add(long x) {
+        roaringBitmap.add(x);
+    }
+
+    public void or(RoaringBitmap64 other) {
+        roaringBitmap.or(other.roaringBitmap);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RoaringBitmap64 that = (RoaringBitmap64) o;
+        return Objects.equals(this.roaringBitmap, that.roaringBitmap);
+    }
+
+    public void clear() {
+        roaringBitmap.clear();
+    }
+
+    public byte[] serialize() throws IOException {
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                DataOutputStream dos = new DataOutputStream(bos)) {
+            roaringBitmap.runOptimize();
+            roaringBitmap.serialize(dos);
+            return bos.toByteArray();
+        }
+    }
+
+    public void deserialize(byte[] rbmBytes) throws IOException {
+        try (ByteArrayInputStream bis = new ByteArrayInputStream(rbmBytes);
+                DataInputStream dis = new DataInputStream(bis)) {
+            roaringBitmap.deserialize(dis);
+        }
+    }
+
+    @VisibleForTesting
+    public static RoaringBitmap64 bitmapOf(long... dat) {
+        RoaringBitmap64 roaringBitmap64 = new RoaringBitmap64();
+        for (long ele : dat) {
+            roaringBitmap64.add(ele);
+        }
+        return roaringBitmap64;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
index 1ce92c4cd..f19506f4b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
@@ -124,6 +124,20 @@ public abstract class FieldAggregator implements 
Serializable {
                                 fieldType);
                         fieldAggregator = new 
FieldThetaSketchAgg((VarBinaryType) fieldType);
                         break;
+                    case FieldRoaringBitmap32Agg.NAME:
+                        checkArgument(
+                                fieldType instanceof VarBinaryType,
+                                "Data type for roaring bitmap column must be 
'VarBinaryType' but was '%s'.",
+                                fieldType);
+                        fieldAggregator = new 
FieldRoaringBitmap32Agg((VarBinaryType) fieldType);
+                        break;
+                    case FieldRoaringBitmap64Agg.NAME:
+                        checkArgument(
+                                fieldType instanceof VarBinaryType,
+                                "Data type for roaring bitmap column must be 
'VarBinaryType' but was '%s'.",
+                                fieldType);
+                        fieldAggregator = new 
FieldRoaringBitmap64Agg((VarBinaryType) fieldType);
+                        break;
                     default:
                         throw new RuntimeException(
                                 String.format(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java
new file mode 100644
index 000000000..15cbc2b96
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.aggregate;
+
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import java.io.IOException;
+
+/** roaring bitmap aggregate a field of a row. */
+public class FieldRoaringBitmap32Agg extends FieldAggregator {
+
+    public static final String NAME = "rbm32";
+
+    private static final long serialVersionUID = 1L;
+    private final RoaringBitmap32 roaringBitmapAcc;
+    private final RoaringBitmap32 roaringBitmapInput;
+
+    public FieldRoaringBitmap32Agg(VarBinaryType dataType) {
+        super(dataType);
+        this.roaringBitmapAcc = new RoaringBitmap32();
+        this.roaringBitmapInput = new RoaringBitmap32();
+    }
+
+    @Override
+    public String name() {
+        return NAME;
+    }
+
+    @Override
+    public Object agg(Object accumulator, Object inputField) {
+        if (accumulator == null && inputField == null) {
+            return null;
+        }
+
+        if (accumulator == null || inputField == null) {
+            return accumulator == null ? inputField : accumulator;
+        }
+
+        try {
+            roaringBitmapAcc.deserialize((byte[]) accumulator);
+            roaringBitmapInput.deserialize((byte[]) inputField);
+            roaringBitmapAcc.or(roaringBitmapInput);
+            return roaringBitmapAcc.serialize();
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to se/deserialize roaring 
bitmap.", e);
+        } finally {
+            roaringBitmapAcc.clear();
+            roaringBitmapInput.clear();
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java
new file mode 100644
index 000000000..aa9cff1fe
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.aggregate;
+
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.utils.RoaringBitmap64;
+
+import java.io.IOException;
+
+/** roaring bitmap aggregate a field of a row. */
+public class FieldRoaringBitmap64Agg extends FieldAggregator {
+
+    public static final String NAME = "rbm64";
+
+    private static final long serialVersionUID = 1L;
+    private final RoaringBitmap64 roaringBitmapAcc;
+    private final RoaringBitmap64 roaringBitmapInput;
+
+    public FieldRoaringBitmap64Agg(VarBinaryType dataType) {
+        super(dataType);
+        this.roaringBitmapAcc = new RoaringBitmap64();
+        this.roaringBitmapInput = new RoaringBitmap64();
+    }
+
+    @Override
+    public String name() {
+        return NAME;
+    }
+
+    @Override
+    public Object agg(Object accumulator, Object inputField) {
+        if (accumulator == null && inputField == null) {
+            return null;
+        }
+
+        if (accumulator == null || inputField == null) {
+            return accumulator == null ? inputField : accumulator;
+        }
+
+        try {
+            roaringBitmapAcc.deserialize((byte[]) accumulator);
+            roaringBitmapInput.deserialize((byte[]) inputField);
+            roaringBitmapAcc.or(roaringBitmapInput);
+            return roaringBitmapAcc.serialize();
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to se/deserialize roaring 
bitmap.", e);
+        } finally {
+            roaringBitmapAcc.clear();
+            roaringBitmapInput.clear();
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index cc3f6813d..bf90c074b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -40,9 +40,12 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.SmallIntType;
 import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.RoaringBitmap32;
+import org.apache.paimon.utils.RoaringBitmap64;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Collections;
@@ -749,6 +752,52 @@ public class FieldAggregatorTest {
         assertThat(result4).isEqualTo(acc2);
     }
 
+    @Test
+    public void testFieldRoaringBitmap32Agg() {
+        FieldRoaringBitmap32Agg agg = new 
FieldRoaringBitmap32Agg(DataTypes.VARBINARY(20));
+
+        byte[] inputVal = RoaringBitmap32.bitmapOf(1).serialize();
+        byte[] acc1 = RoaringBitmap32.bitmapOf(2, 3).serialize();
+        byte[] acc2 = RoaringBitmap32.bitmapOf(1, 2, 3).serialize();
+
+        assertThat(agg.agg(null, null)).isNull();
+
+        byte[] result1 = (byte[]) agg.agg(null, inputVal);
+        assertThat(inputVal).isEqualTo(result1);
+
+        byte[] result2 = (byte[]) agg.agg(acc1, null);
+        assertThat(result2).isEqualTo(acc1);
+
+        byte[] result3 = (byte[]) agg.agg(acc1, inputVal);
+        assertThat(result3).isEqualTo(acc2);
+
+        byte[] result4 = (byte[]) agg.agg(acc2, inputVal);
+        assertThat(result4).isEqualTo(acc2);
+    }
+
+    @Test
+    public void testFieldRoaringBitmap64Agg() throws IOException {
+        FieldRoaringBitmap64Agg agg = new 
FieldRoaringBitmap64Agg(DataTypes.VARBINARY(20));
+
+        byte[] inputVal = RoaringBitmap64.bitmapOf(1L).serialize();
+        byte[] acc1 = RoaringBitmap64.bitmapOf(2L, 3L).serialize();
+        byte[] acc2 = RoaringBitmap64.bitmapOf(1L, 2L, 3L).serialize();
+
+        assertThat(agg.agg(null, null)).isNull();
+
+        byte[] result1 = (byte[]) agg.agg(null, inputVal);
+        assertThat(inputVal).isEqualTo(result1);
+
+        byte[] result2 = (byte[]) agg.agg(acc1, null);
+        assertThat(result2).isEqualTo(acc1);
+
+        byte[] result3 = (byte[]) agg.agg(acc1, inputVal);
+        assertThat(result3).isEqualTo(acc2);
+
+        byte[] result4 = (byte[]) agg.agg(acc2, inputVal);
+        assertThat(result4).isEqualTo(acc2);
+    }
+
     private Map<Object, Object> toMap(Object... kvs) {
         Map<Object, Object> result = new HashMap<>();
         for (int i = 0; i < kvs.length; i += 2) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index 2403b876a..cc828c1bf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -22,6 +22,8 @@ import 
org.apache.paimon.mergetree.compact.aggregate.FieldCollectAgg;
 import org.apache.paimon.mergetree.compact.aggregate.FieldMergeMapAgg;
 import org.apache.paimon.mergetree.compact.aggregate.FieldNestedUpdateAgg;
 import org.apache.paimon.utils.BlockingIterator;
+import org.apache.paimon.utils.RoaringBitmap32;
+import org.apache.paimon.utils.RoaringBitmap64;
 
 import org.apache.commons.codec.binary.Hex;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -35,6 +37,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -1857,4 +1860,128 @@ public class PreAggregationITCase {
             assertThat(row.getField(1)).isEqualTo(expected);
         }
     }
+
+    /**
+     * ITCase for {@link 
org.apache.paimon.mergetree.compact.aggregate.FieldRoaringBitmap32Agg} &
+     * {@link 
org.apache.paimon.mergetree.compact.aggregate.FieldRoaringBitmap64Agg}.
+     */
+    public static class RoaringBitmapAggAggregationITCase extends 
CatalogITCaseBase {
+
+        @Test
+        public void testRoaring32BitmapAgg() throws IOException {
+            sql(
+                    "CREATE TABLE test_rbm64("
+                            + "  id INT PRIMARY KEY NOT ENFORCED,"
+                            + "  f0 VARBINARY"
+                            + ") WITH ("
+                            + "  'merge-engine' = 'aggregation',"
+                            + "  'fields.f0.aggregate-function' = 'rbm32'"
+                            + ")");
+
+            byte[] v1Bytes = RoaringBitmap32.bitmapOf(1).serialize();
+            byte[] v2Bytes = RoaringBitmap32.bitmapOf(2).serialize();
+            byte[] v3Bytes = RoaringBitmap32.bitmapOf(3).serialize();
+            byte[] v4Bytes = RoaringBitmap32.bitmapOf(1, 2).serialize();
+            byte[] v5Bytes = RoaringBitmap32.bitmapOf(2, 3).serialize();
+            String v1 = Hex.encodeHexString(v1Bytes).toUpperCase();
+            String v2 = Hex.encodeHexString(v2Bytes).toUpperCase();
+            String v3 = Hex.encodeHexString(v3Bytes).toUpperCase();
+
+            sql(
+                    "INSERT INTO test_rbm64 VALUES "
+                            + "(1, CAST (NULL AS VARBINARY)), "
+                            + "(2, CAST (x'"
+                            + v1
+                            + "' AS VARBINARY)), "
+                            + "(3, CAST (x'"
+                            + v2
+                            + "' AS VARBINARY))");
+
+            List<Row> result = queryAndSort("SELECT * FROM test_rbm64");
+            checkOneRecord(result.get(0), 1, null);
+            checkOneRecord(result.get(1), 2, v1Bytes);
+            checkOneRecord(result.get(2), 3, v2Bytes);
+
+            sql(
+                    "INSERT INTO test_rbm64 VALUES "
+                            + "(1, CAST (x'"
+                            + v1
+                            + "' AS VARBINARY)), "
+                            + "(2, CAST (x'"
+                            + v2
+                            + "' AS VARBINARY)), "
+                            + "(2, CAST (x'"
+                            + v2
+                            + "' AS VARBINARY)), "
+                            + "(3, CAST (x'"
+                            + v3
+                            + "' AS VARBINARY))");
+
+            result = queryAndSort("SELECT * FROM test_rbm64");
+            checkOneRecord(result.get(0), 1, v1Bytes);
+            checkOneRecord(result.get(1), 2, v4Bytes);
+            checkOneRecord(result.get(2), 3, v5Bytes);
+        }
+
+        @Test
+        public void testRoaring64BitmapAgg() throws IOException {
+            sql(
+                    "CREATE TABLE test_rbm64("
+                            + "  id INT PRIMARY KEY NOT ENFORCED,"
+                            + "  f0 VARBINARY"
+                            + ") WITH ("
+                            + "  'merge-engine' = 'aggregation',"
+                            + "  'fields.f0.aggregate-function' = 'rbm64'"
+                            + ")");
+
+            byte[] v1Bytes = RoaringBitmap64.bitmapOf(1L).serialize();
+            byte[] v2Bytes = RoaringBitmap64.bitmapOf(2L).serialize();
+            byte[] v3Bytes = RoaringBitmap64.bitmapOf(3L).serialize();
+            byte[] v4Bytes = RoaringBitmap64.bitmapOf(1L, 2L).serialize();
+            byte[] v5Bytes = RoaringBitmap64.bitmapOf(2L, 3L).serialize();
+            String v1 = Hex.encodeHexString(v1Bytes).toUpperCase();
+            String v2 = Hex.encodeHexString(v2Bytes).toUpperCase();
+            String v3 = Hex.encodeHexString(v3Bytes).toUpperCase();
+
+            sql(
+                    "INSERT INTO test_rbm64 VALUES "
+                            + "(1, CAST (NULL AS VARBINARY)), "
+                            + "(2, CAST (x'"
+                            + v1
+                            + "' AS VARBINARY)), "
+                            + "(3, CAST (x'"
+                            + v2
+                            + "' AS VARBINARY))");
+
+            List<Row> result = queryAndSort("SELECT * FROM test_rbm64");
+            checkOneRecord(result.get(0), 1, null);
+            checkOneRecord(result.get(1), 2, v1Bytes);
+            checkOneRecord(result.get(2), 3, v2Bytes);
+
+            sql(
+                    "INSERT INTO test_rbm64 VALUES "
+                            + "(1, CAST (x'"
+                            + v1
+                            + "' AS VARBINARY)), "
+                            + "(2, CAST (x'"
+                            + v2
+                            + "' AS VARBINARY)), "
+                            + "(2, CAST (x'"
+                            + v2
+                            + "' AS VARBINARY)), "
+                            + "(3, CAST (x'"
+                            + v3
+                            + "' AS VARBINARY))");
+
+            result = queryAndSort("SELECT * FROM test_rbm64");
+            checkOneRecord(result.get(0), 1, v1Bytes);
+            checkOneRecord(result.get(1), 2, v4Bytes);
+            checkOneRecord(result.get(2), 3, v5Bytes);
+        }
+
+        private void checkOneRecord(Row row, int id, byte[] expected) {
+            assertThat(row.getField(0)).isEqualTo(id);
+            assertThat(row.getField(1)).isEqualTo(expected);
+        }
+    }
 }

Reply via email to