This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new a55c4ac61 [iceberg] Introduce iceberg bucketing function (#1409)
a55c4ac61 is described below

commit a55c4ac61bdf76a75d42b13aae585306d76075ab
Author: Yang Guo <[email protected]>
AuthorDate: Thu Aug 7 19:36:31 2025 +0800

    [iceberg] Introduce iceberg bucketing function (#1409)
---
 .../alibaba/fluss/bucketing/BucketingFunction.java |   2 +
 .../fluss/bucketing/IcebergBucketingFunction.java  |  38 +++
 .../row/encode/iceberg/IcebergBinaryRowWriter.java |  69 +++---
 .../bucketing/IcebergBucketingFunctionTest.java    | 261 +++++++++++++++++++++
 .../encode/iceberg/IcebergBinaryRowWriterTest.java | 221 +++++++++++++++++
 .../row/encode/iceberg/IcebergKeyEncoderTest.java  | 249 +++-----------------
 6 files changed, 597 insertions(+), 243 deletions(-)

diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java 
b/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java
index 8b06ae3c1..5fe3f6f57 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java
@@ -41,6 +41,8 @@ public interface BucketingFunction {
             return new PaimonBucketingFunction();
         } else if (lakeFormat == DataLakeFormat.LANCE) {
             return new FlussBucketingFunction();
+        } else if (lakeFormat == DataLakeFormat.ICEBERG) {
+            return new IcebergBucketingFunction();
         } else {
             throw new UnsupportedOperationException("Unsupported lake format: 
" + lakeFormat);
         }
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/bucketing/IcebergBucketingFunction.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/bucketing/IcebergBucketingFunction.java
new file mode 100644
index 000000000..0325f3480
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/bucketing/IcebergBucketingFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.bucketing;
+
+import com.alibaba.fluss.shaded.guava32.com.google.common.hash.HashFunction;
+import com.alibaba.fluss.shaded.guava32.com.google.common.hash.Hashing;
+
+/** An implementation of {@link BucketingFunction} to follow Iceberg's 
bucketing strategy. */
+public class IcebergBucketingFunction implements BucketingFunction {
+
+    private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
+
+    @Override
+    public int bucketing(byte[] bucketKey, int numBuckets) {
+        if (bucketKey == null || bucketKey.length == 0) {
+            throw new IllegalArgumentException("bucketKey must not be null or 
empty");
+        }
+        if (numBuckets <= 0) {
+            throw new IllegalArgumentException("numBuckets must be positive");
+        }
+        return (MURMUR3.hashBytes(bucketKey).asInt() & Integer.MAX_VALUE) % 
numBuckets;
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
index b18a9bfa0..4dd2ffeb9 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
@@ -27,36 +27,31 @@ import com.alibaba.fluss.utils.UnsafeUtils;
 import java.io.Serializable;
 import java.util.Arrays;
 
-import static com.alibaba.fluss.types.DataTypeChecks.getPrecision;
-
 /**
  * A writer to encode Fluss's {@link com.alibaba.fluss.row.InternalRow} using 
Iceberg's binary
  * encoding format.
  *
- * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() 
implementation:
- * 
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java
- *
- * <p>Key encoding principles from Iceberg's Conversions class:
+ * <p>The encoding logic is to encode different types of keys into bytes 
array, which will have the
+ * same hash value with iceberg's bucketing hash function.
  *
  * <ul>
- *   <li>All numeric types (int, long, float, double, timestamps) use 
LITTLE-ENDIAN byte order
+ *   <li>For int type, it is treated as a long value for encoding
+ *   <li>All numeric types (long, float, double, timestamps) use LITTLE-ENDIAN 
byte order
  *   <li>Decimal types use BIG-ENDIAN byte order
  *   <li>Strings are encoded as UTF-8 bytes
  *   <li>Timestamps are stored as long values (microseconds since epoch)
  * </ul>
  *
  * <p>Note: This implementation uses Fluss's MemorySegment instead of 
ByteBuffer for performance,
- * but maintains byte-level compatibility with Iceberg's encoding.
+ * but maintains byte-level compatibility.
  */
 class IcebergBinaryRowWriter {
 
-    private final int arity;
     private byte[] buffer;
     private MemorySegment segment;
     private int cursor;
 
     public IcebergBinaryRowWriter(int arity) {
-        this.arity = arity;
         // Conservative initial size to avoid frequent resizing
         int initialSize = 8 + (arity * 8);
         setBuffer(new byte[initialSize]);
@@ -64,11 +59,11 @@ class IcebergBinaryRowWriter {
     }
 
     public void reset() {
-        this.cursor = 0;
         // Clear only the used portion for efficiency
         if (cursor > 0) {
             Arrays.fill(buffer, 0, Math.min(cursor, buffer.length), (byte) 0);
         }
+        this.cursor = 0;
     }
 
     public byte[] toBytes() {
@@ -77,13 +72,6 @@ class IcebergBinaryRowWriter {
         return result;
     }
 
-    public void setNullAt(int pos) {
-        // For Iceberg key encoding, null values should not occur
-        // This is validated at the encoder level
-        throw new UnsupportedOperationException(
-                "Null values are not supported in Iceberg key encoding");
-    }
-
     public void writeBoolean(boolean value) {
         ensureCapacity(1);
         UnsafeUtils.putBoolean(buffer, cursor, value);
@@ -108,6 +96,10 @@ class IcebergBinaryRowWriter {
         cursor += 4;
     }
 
+    public void writeIntAsLong(int value) {
+        writeLong(value);
+    }
+
     public void writeLong(long value) {
         ensureCapacity(8);
         UnsafeUtils.putLong(buffer, cursor, value);
@@ -127,26 +119,40 @@ class IcebergBinaryRowWriter {
     }
 
     public void writeString(BinaryString value) {
+        writeString(value, false);
+    }
+
+    public void writeString(BinaryString value, boolean skipEncodeLength) {
         // Convert to UTF-8 byte array
         byte[] bytes = BinaryString.encodeUTF8(value.toString());
-        // Write length prefix followed by UTF-8 bytes
-        writeInt(bytes.length); // 4-byte length prefix
-        ensureCapacity(bytes.length); // Ensure space for actual string bytes
-        segment.put(cursor, bytes, 0, bytes.length);
-        cursor += bytes.length;
+        writeByteArray(bytes, skipEncodeLength);
     }
 
     public void writeBytes(byte[] bytes) {
-        // Write length prefix followed by binary data
-        writeInt(bytes.length); // 4-byte length prefix
+        writeBytes(bytes, false);
+    }
+
+    public void writeBytes(byte[] bytes, boolean skipEncodeLength) {
+        writeByteArray(bytes, skipEncodeLength);
+    }
+
+    private void writeByteArray(byte[] bytes, boolean skipEncodeLength) {
+        if (!skipEncodeLength) {
+            // Write length prefix followed by binary data
+            writeInt(bytes.length); // 4-byte length prefix
+        }
         ensureCapacity(bytes.length); // Ensure space for actual binary bytes
         segment.put(cursor, bytes, 0, bytes.length);
         cursor += bytes.length;
     }
 
-    public void writeDecimal(Decimal value, int precision) {
+    public void writeDecimal(Decimal value) {
+        writeDecimal(value, false);
+    }
+
+    public void writeDecimal(Decimal value, boolean skipEncodeLength) {
         byte[] unscaled = value.toUnscaledBytes();
-        writeBytes(unscaled); // Adds 4-byte length prefix before the actual 
bytes
+        writeBytes(unscaled, skipEncodeLength); // Adds 4-byte length prefix 
before the actual bytes
     }
 
     private void ensureCapacity(int neededSize) {
@@ -178,7 +184,7 @@ class IcebergBinaryRowWriter {
         switch (fieldType.getTypeRoot()) {
             case INTEGER:
             case DATE:
-                return (writer, value) -> writer.writeInt((int) value);
+                return (writer, value) -> writer.writeIntAsLong((int) value);
 
             case TIME_WITHOUT_TIME_ZONE:
                 // Write time as microseconds long (milliseconds * 1000)
@@ -199,16 +205,15 @@ class IcebergBinaryRowWriter {
                 };
 
             case DECIMAL:
-                final int decimalPrecision = getPrecision(fieldType);
-                return (writer, value) -> writer.writeDecimal((Decimal) value, 
decimalPrecision);
+                return (writer, value) -> writer.writeDecimal((Decimal) value, 
true);
 
             case STRING:
             case CHAR:
-                return (writer, value) -> writer.writeString((BinaryString) 
value);
+                return (writer, value) -> writer.writeString((BinaryString) 
value, true);
 
             case BINARY:
             case BYTES:
-                return (writer, value) -> writer.writeBytes((byte[]) value);
+                return (writer, value) -> writer.writeBytes((byte[]) value, 
true);
 
             default:
                 throw new IllegalArgumentException(
diff --git 
a/fluss-common/src/test/java/com/alibaba/fluss/bucketing/IcebergBucketingFunctionTest.java
 
b/fluss-common/src/test/java/com/alibaba/fluss/bucketing/IcebergBucketingFunctionTest.java
new file mode 100644
index 000000000..278136aa5
--- /dev/null
+++ 
b/fluss-common/src/test/java/com/alibaba/fluss/bucketing/IcebergBucketingFunctionTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.bucketing;
+
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.row.encode.iceberg.IcebergKeyEncoder;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link IcebergBucketingFunction}. */
+class IcebergBucketingFunctionTest {
+
+    @Test
+    void testIntegerHash() throws IOException {
+        int testValue = 42;
+        int bucketNum = 10;
+
+        RowType rowType = RowType.of(new DataType[] {DataTypes.INT()}, new 
String[] {"id"});
+
+        GenericRow row = GenericRow.of(testValue);
+        IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, 
Collections.singletonList("id"));
+
+        // Encode with our implementation
+        byte[] ourEncodedKey = encoder.encodeKey(row);
+        // This is the equivalent bytes array which the key should be encoded 
to.
+        byte[] equivalentBytes = toBytes(testValue);
+        assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+        Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+        int icebergBucket = 
transform.bind(Types.IntegerType.get()).apply(testValue);
+
+        IcebergBucketingFunction icebergBucketingFunction = new 
IcebergBucketingFunction();
+        int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey, 
bucketNum);
+
+        assertThat(ourBucket).isEqualTo(icebergBucket);
+    }
+
+    @Test
+    void testLongHash() throws IOException {
+        long testValue = 1234567890123456789L;
+        int bucketNum = 10;
+
+        RowType rowType = RowType.of(new DataType[] {DataTypes.BIGINT()}, new 
String[] {"id"});
+
+        GenericRow row = GenericRow.of(testValue);
+        IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, 
Collections.singletonList("id"));
+
+        // Encode with our implementation
+        byte[] ourEncodedKey = encoder.encodeKey(row);
+        // This is the equivalent bytes array which the key should be encoded 
to.
+        byte[] equivalentBytes = toBytes(testValue);
+        assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+        Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+        int icebergBucket = 
transform.bind(Types.LongType.get()).apply(testValue);
+
+        IcebergBucketingFunction icebergBucketingFunction = new 
IcebergBucketingFunction();
+        int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey, 
bucketNum);
+
+        assertThat(ourBucket).isEqualTo(icebergBucket);
+    }
+
+    @Test
+    void testStringHash() throws IOException {
+        String testValue = "Hello Iceberg, Fluss this side!";
+        int bucketNum = 10;
+
+        RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new 
String[] {"name"});
+
+        GenericRow row = GenericRow.of(BinaryString.fromString(testValue));
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("name"));
+
+        // Encode with our implementation
+        byte[] ourEncodedKey = encoder.encodeKey(row);
+        // This is the equivalent bytes array which the key should be encoded 
to.
+        byte[] equivalentBytes = testValue.getBytes(StandardCharsets.UTF_8);
+        assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+        Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+        int icebergBucket = 
transform.bind(Types.StringType.get()).apply(testValue);
+
+        IcebergBucketingFunction icebergBucketingFunction = new 
IcebergBucketingFunction();
+        int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey, 
bucketNum);
+
+        assertThat(ourBucket).isEqualTo(icebergBucket);
+    }
+
+    @Test
+    void testDecimalHash() throws IOException {
+        BigDecimal testValue = new BigDecimal("123.45");
+        Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2);
+        int bucketNum = 10;
+
+        RowType rowType =
+                RowType.of(new DataType[] {DataTypes.DECIMAL(10, 2)}, new 
String[] {"amount"});
+
+        GenericRow row = GenericRow.of(decimal);
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("amount"));
+
+        // Encode with our implementation
+        byte[] ourEncodedKey = encoder.encodeKey(row);
+        // This is the equivalent bytes array which the key should be encoded 
to.
+        byte[] equivalentBytes = testValue.unscaledValue().toByteArray();
+        assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+        Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+        int icebergBucket = transform.bind(Types.DecimalType.of(10, 
2)).apply(testValue);
+
+        IcebergBucketingFunction icebergBucketingFunction = new 
IcebergBucketingFunction();
+        int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey, 
bucketNum);
+
+        assertThat(ourBucket).isEqualTo(icebergBucket);
+    }
+
+    @Test
+    void testTimestampEncodingHash() throws IOException {
+        // Iceberg expects microseconds for TIMESTAMP type
+        long millis = 1698235273182L;
+        int nanos = 123456;
+        long micros = millis * 1000 + (nanos / 1000);
+        TimestampNtz testValue = TimestampNtz.fromMillis(millis, nanos);
+        int bucketNum = 10;
+
+        RowType rowType =
+                RowType.of(new DataType[] {DataTypes.TIMESTAMP(6)}, new 
String[] {"event_time"});
+
+        GenericRow row = GenericRow.of(testValue);
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("event_time"));
+
+        // Encode with our implementation
+        byte[] ourEncodedKey = encoder.encodeKey(row);
+        // This is the equivalent bytes array which the key should be encoded 
to.
+        byte[] equivalentBytes = toBytes(micros);
+        assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+        Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+        int icebergBucket = 
transform.bind(Types.TimestampType.withoutZone()).apply(micros);
+
+        IcebergBucketingFunction icebergBucketingFunction = new 
IcebergBucketingFunction();
+        int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey, 
bucketNum);
+
+        assertThat(ourBucket).isEqualTo(icebergBucket);
+    }
+
+    @Test
+    void testDateHash() throws IOException {
+        int dateValue = 19655;
+        int bucketNum = 10;
+
+        RowType rowType = RowType.of(new DataType[] {DataTypes.DATE()}, new 
String[] {"date"});
+        GenericRow row = GenericRow.of(dateValue);
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("date"));
+
+        // Encode with our implementation
+        byte[] ourEncodedKey = encoder.encodeKey(row);
+        // This is the equivalent bytes array which the key should be encoded 
to.
+        byte[] equivalentBytes = toBytes(dateValue);
+        assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+        Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+        int icebergBucket = 
transform.bind(Types.DateType.get()).apply(dateValue);
+
+        IcebergBucketingFunction icebergBucketingFunction = new 
IcebergBucketingFunction();
+        int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey, 
bucketNum);
+
+        assertThat(ourBucket).isEqualTo(icebergBucket);
+    }
+
+    @Test
+    void testTimeHashing() throws IOException {
+        // Fluss stores time as int (milliseconds since midnight)
+        int timeMillis = 34200000;
+        long timeMicros = timeMillis * 1000L; // Convert to microseconds for 
Iceberg
+        int bucketNum = 10;
+
+        RowType rowType = RowType.of(new DataType[] {DataTypes.TIME()}, new 
String[] {"time"});
+
+        GenericRow row = GenericRow.of(timeMillis);
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("time"));
+
+        // Encode with our implementation
+        byte[] ourEncodedKey = encoder.encodeKey(row);
+        byte[] equivalentBytes = toBytes(timeMicros);
+        assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+        Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+        int icebergBucket = 
transform.bind(Types.TimeType.get()).apply(timeMicros);
+
+        IcebergBucketingFunction icebergBucketingFunction = new 
IcebergBucketingFunction();
+        int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey, 
bucketNum);
+
+        assertThat(ourBucket).isEqualTo(icebergBucket);
+    }
+
+    @Test
+    void testBinaryEncoding() throws IOException {
+        byte[] testValue = "Hello i only understand binary data".getBytes();
+        int bucketNum = 10;
+
+        RowType rowType = RowType.of(new DataType[] {DataTypes.BYTES()}, new 
String[] {"data"});
+
+        GenericRow row = GenericRow.of(testValue);
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("data"));
+
+        // Encode with our implementation
+        byte[] ourEncodedKey = encoder.encodeKey(row);
+        assertThat(ourEncodedKey).isEqualTo(testValue);
+
+        Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+        ByteBuffer byteBuffer = ByteBuffer.wrap(testValue);
+        int icebergBucket = 
transform.bind(Types.BinaryType.get()).apply(byteBuffer);
+
+        IcebergBucketingFunction icebergBucketingFunction = new 
IcebergBucketingFunction();
+        int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey, 
bucketNum);
+
+        assertThat(ourBucket).isEqualTo(icebergBucket);
+    }
+
+    private byte[] toBytes(long value) {
+        return 
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(value).array();
+    }
+}
diff --git 
a/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
 
b/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
new file mode 100644
index 000000000..8d2109301
--- /dev/null
+++ 
b/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.row.encode.iceberg;
+
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link IcebergBinaryRowWriter}. */
+class IcebergBinaryRowWriterTest {
+
+    @Test
+    void testWriteBoolean() {
+        boolean value = false;
+
+        IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+        writer.writeBoolean(value);
+
+        byte[] writerBytes = writer.toBytes();
+
+        ByteBuffer buffer = ByteBuffer.allocate(1);
+        buffer.put((byte) (value ? 1 : 0));
+        byte[] expectedBytes = buffer.array();
+        assertThat(writerBytes).isEqualTo(expectedBytes);
+    }
+
+    @Test
+    void testWriteByte() {
+        byte value = 1;
+
+        IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+        writer.writeByte(value);
+
+        byte[] writerBytes = writer.toBytes();
+
+        ByteBuffer buffer = ByteBuffer.allocate(1);
+        buffer.put((byte) (value));
+        byte[] expectedBytes = buffer.array();
+        assertThat(writerBytes).isEqualTo(expectedBytes);
+    }
+
+    @Test
+    void testWriteShort() {
+        short value = 20;
+
+        IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+        writer.writeShort(value);
+
+        byte[] writerBytes = writer.toBytes();
+
+        ByteBuffer buffer = ByteBuffer.allocate(2);
+        buffer.put((byte) value);
+        byte[] expectedBytes = buffer.array();
+        assertThat(writerBytes).isEqualTo(expectedBytes);
+    }
+
+    @Test
+    void testWriteInt() {
+        int value = 1234;
+
+        IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+        writer.writeInt(value);
+
+        byte[] writerBytes = writer.toBytes();
+
+        ByteBuffer buffer = 
ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+        buffer.putInt(value);
+        byte[] expectedBytes = buffer.array();
+        assertThat(writerBytes).isEqualTo(expectedBytes);
+    }
+
+    @Test
+    void testWriteIntAsLong() {
+        int value = 1234;
+
+        IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+        writer.writeIntAsLong(value);
+
+        byte[] writerBytes = writer.toBytes();
+
+        ByteBuffer buffer = 
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
+        buffer.putLong(value);
+        byte[] expectedBytes = buffer.array();
+        assertThat(writerBytes).isEqualTo(expectedBytes);
+    }
+
+    @Test
+    void testWriteLong() {
+        long value = 123456899122345678L;
+
+        IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+        writer.writeLong(value);
+
+        byte[] writerBytes = writer.toBytes();
+
+        ByteBuffer buffer = 
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
+        buffer.putLong(value);
+        byte[] expectedBytes = buffer.array();
+        assertThat(writerBytes).isEqualTo(expectedBytes);
+    }
+
+    @Test
+    void testWriteFloat() {
+        float value = 3.14f;
+
+        IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+        writer.writeFloat(value);
+
+        byte[] writerBytes = writer.toBytes();
+
+        ByteBuffer buffer = 
ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+        buffer.putFloat(value);
+        byte[] expectedBytes = buffer.array();
+        assertThat(writerBytes).isEqualTo(expectedBytes);
+    }
+
+    @Test
+    void testWriteDouble() {
+        double value = 3.1415926;
+
+        IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+        writer.writeDouble(value);
+
+        byte[] writerBytes = writer.toBytes();
+
+        ByteBuffer buffer = 
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
+        buffer.putDouble(value);
+        byte[] expectedBytes = buffer.array();
+        assertThat(writerBytes).isEqualTo(expectedBytes);
+    }
+
+    @Test
+    void testWriteString() {
+        String value = "Hello World!";
+
+        IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+        writer.writeString(BinaryString.fromString(value));
+        byte[] writerBytes = writer.toBytes();
+        byte[] strBytes = value.getBytes(StandardCharsets.UTF_8);
+
+        int length = ByteBuffer.wrap(writerBytes, 0, 
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+        byte[] actualContent = Arrays.copyOfRange(writerBytes, 4, 4 + length);
+
+        assertThat(length).isEqualTo(strBytes.length);
+        assertThat(actualContent).isEqualTo(strBytes);
+
+        writer.reset();
+
+        writer.writeString(BinaryString.fromString(value), true);
+        byte[] writerBytesWithoutPrefix = writer.toBytes();
+        assertThat(writerBytesWithoutPrefix).isEqualTo(strBytes);
+    }
+
+    @Test
+    void testWriteBytes() {
+        byte[] value = new byte[] {12, 23, 34, 45};
+
+        IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+        writer.writeBytes(value);
+        byte[] writerBytes = writer.toBytes();
+
+        int length = ByteBuffer.wrap(writerBytes, 0, 
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+        byte[] actualContent = Arrays.copyOfRange(writerBytes, 4, 4 + length);
+
+        assertThat(length).isEqualTo(value.length);
+        assertThat(actualContent).isEqualTo(value);
+
+        writer.reset();
+
+        writer.writeBytes(value, true);
+        byte[] writerBytesWithoutPrefix = writer.toBytes();
+        assertThat(writerBytesWithoutPrefix).isEqualTo(value);
+    }
+
+    @Test
+    void testWriteDecimal() {
+        BigDecimal bigDecimal = new BigDecimal("123.45");
+        Decimal value = Decimal.fromBigDecimal(bigDecimal, 10, 2);
+
+        IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+        writer.writeDecimal(value);
+        byte[] writerBytes = writer.toBytes();
+        byte[] expectedBytes = bigDecimal.unscaledValue().toByteArray();
+
+        int length = ByteBuffer.wrap(writerBytes, 0, 
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+        byte[] actualContent = Arrays.copyOfRange(writerBytes, 4, 4 + length);
+
+        assertThat(length).isEqualTo(expectedBytes.length);
+        assertThat(actualContent).isEqualTo(expectedBytes);
+
+        writer.reset();
+
+        writer.writeDecimal(value, true);
+        byte[] writerBytesWithoutPrefix = writer.toBytes();
+        assertThat(writerBytesWithoutPrefix).isEqualTo(expectedBytes);
+    }
+}
diff --git 
a/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
 
b/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
index 2934f85e8..14f0cf1b6 100644
--- 
a/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
+++ 
b/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
@@ -19,16 +19,13 @@ package com.alibaba.fluss.row.encode.iceberg;
 
 import com.alibaba.fluss.row.BinaryString;
 import com.alibaba.fluss.row.Decimal;
-import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.GenericRow;
 import com.alibaba.fluss.row.TimestampNtz;
-import com.alibaba.fluss.row.indexed.IndexedRow;
-import com.alibaba.fluss.row.indexed.IndexedRowWriter;
 import com.alibaba.fluss.types.DataType;
 import com.alibaba.fluss.types.DataTypes;
 import com.alibaba.fluss.types.RowType;
 
 import org.apache.iceberg.types.Conversions;
-import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.Test;
 
@@ -36,19 +33,14 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 
-import static com.alibaba.fluss.types.DataTypeChecks.getPrecision;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/**
- * Unit tests for {@link IcebergKeyEncoder} to verify the encoding matches 
Iceberg's format.
- *
- * <p>This test uses Iceberg's actual Conversions class to ensure our encoding 
is byte-for-byte
- * compatible with Iceberg's implementation.
- */
+/** Unit tests for {@link IcebergKeyEncoder} to verify the encoding matches 
Iceberg's format. */
 class IcebergKeyEncoderTest {
 
     @Test
@@ -73,17 +65,15 @@ class IcebergKeyEncoderTest {
         RowType rowType = RowType.of(new DataType[] {DataTypes.INT()}, new 
String[] {"id"});
 
         int testValue = 42;
-        IndexedRow row = createRowWithInt(testValue);
+        GenericRow row = GenericRow.of(testValue);
         IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, 
Collections.singletonList("id"));
 
         // Encode with our implementation
         byte[] ourEncoded = encoder.encodeKey(row);
 
-        // Encode with Iceberg's implementation
-        ByteBuffer icebergBuffer = 
Conversions.toByteBuffer(Types.IntegerType.get(), testValue);
-        byte[] icebergEncoded = toByteArray(icebergBuffer);
-
-        assertThat(ourEncoded).isEqualTo(icebergEncoded);
+        // This is the equivalent bytes array which the key should be encoded 
to.
+        byte[] equivalentBytes = toBytes(testValue);
+        assertThat(ourEncoded).isEqualTo(equivalentBytes);
     }
 
     @Test
@@ -91,12 +81,16 @@ class IcebergKeyEncoderTest {
         RowType rowType = RowType.of(new DataType[] {DataTypes.BIGINT()}, new 
String[] {"id"});
 
         long testValue = 1234567890123456789L;
-        IndexedRow row = createRowWithLong(testValue);
+        GenericRow row = GenericRow.of(testValue);
         IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, 
Collections.singletonList("id"));
 
         // Encode with our implementation
         byte[] ourEncoded = encoder.encodeKey(row);
 
+        // This is the equivalent bytes array which the key should be encoded 
to.
+        byte[] equivalentBytes = toBytes(testValue);
+        assertThat(ourEncoded).isEqualTo(equivalentBytes);
+
         // Encode with Iceberg's implementation
         ByteBuffer icebergBuffer = 
Conversions.toByteBuffer(Types.LongType.get(), testValue);
         byte[] icebergEncoded = toByteArray(icebergBuffer);
@@ -109,24 +103,16 @@ class IcebergKeyEncoderTest {
         RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new 
String[] {"name"});
 
         String testValue = "Hello Iceberg, Fluss this side!";
-        IndexedRow row = createRowWithString(testValue);
+        GenericRow row = GenericRow.of(BinaryString.fromString(testValue));
         IcebergKeyEncoder encoder =
                 new IcebergKeyEncoder(rowType, 
Collections.singletonList("name"));
 
         // Encode with our implementation
         byte[] ourEncoded = encoder.encodeKey(row);
 
-        // Decode length prefix
-        int length = ByteBuffer.wrap(ourEncoded, 0, 
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
-        byte[] actualContent = Arrays.copyOfRange(ourEncoded, 4, 4 + length);
-
-        // Encode with Iceberg's Conversions
-        byte[] expectedContent =
-                toByteArray(Conversions.toByteBuffer(Types.StringType.get(), 
testValue));
-
-        // Validate length and content
-        assertThat(length).isEqualTo(expectedContent.length);
-        assertThat(actualContent).isEqualTo(expectedContent);
+        // This is the equivalent bytes array which the key should be encoded 
to.
+        byte[] equivalentBytes = testValue.getBytes(StandardCharsets.UTF_8);
+        assertThat(ourEncoded).isEqualTo(equivalentBytes);
     }
 
     @Test
@@ -135,25 +121,17 @@ class IcebergKeyEncoderTest {
                 RowType.of(new DataType[] {DataTypes.DECIMAL(10, 2)}, new 
String[] {"amount"});
 
         BigDecimal testValue = new BigDecimal("123.45");
-        IndexedRow row = createRowWithDecimal(testValue, 10, 2);
+        Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2);
+        GenericRow row = GenericRow.of(decimal);
         IcebergKeyEncoder encoder =
                 new IcebergKeyEncoder(rowType, 
Collections.singletonList("amount"));
 
         // Encode with our implementation
         byte[] ourEncoded = encoder.encodeKey(row);
 
-        // Extract the decimal length prefix and bytes from ourEncoded
-        int length = ByteBuffer.wrap(ourEncoded, 0, 
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
-        byte[] actualDecimal = Arrays.copyOfRange(ourEncoded, 4, 4 + length);
-
-        // Encode the same value with Iceberg's implementation (no prefix)
-        Type.PrimitiveType decimalType = Types.DecimalType.of(10, 2);
-        ByteBuffer icebergBuffer = Conversions.toByteBuffer(decimalType, 
testValue);
-        byte[] icebergEncoded = toByteArray(icebergBuffer);
-
-        // Validate that our content (excluding the prefix) matches Iceberg's 
encoding
-        assertThat(length).isEqualTo(icebergEncoded.length);
-        assertThat(actualDecimal).isEqualTo(icebergEncoded);
+        // This is the equivalent bytes array which the key should be encoded 
to.
+        byte[] equivalentBytes = testValue.unscaledValue().toByteArray();
+        assertThat(ourEncoded).isEqualTo(equivalentBytes);
     }
 
     @Test
@@ -166,19 +144,16 @@ class IcebergKeyEncoderTest {
         int nanos = 123456;
         long micros = millis * 1000 + (nanos / 1000);
 
-        IndexedRow row = createRowWithTimestampNtz(millis, nanos);
+        TimestampNtz testValue = TimestampNtz.fromMillis(millis, nanos);
+        GenericRow row = GenericRow.of(testValue);
         IcebergKeyEncoder encoder =
                 new IcebergKeyEncoder(rowType, 
Collections.singletonList("event_time"));
 
         // Encode with our implementation
         byte[] ourEncoded = encoder.encodeKey(row);
-
-        // Encode with Iceberg's implementation
-        ByteBuffer icebergBuffer =
-                Conversions.toByteBuffer(Types.TimestampType.withoutZone(), 
micros);
-        byte[] icebergEncoded = toByteArray(icebergBuffer);
-
-        assertThat(ourEncoded).isEqualTo(icebergEncoded);
+        // This is the equivalent bytes array which the key should be encoded 
to.
+        byte[] equivalentBytes = toBytes(micros);
+        assertThat(ourEncoded).isEqualTo(equivalentBytes);
     }
 
     @Test
@@ -187,18 +162,16 @@ class IcebergKeyEncoderTest {
 
         // Date value as days since epoch
         int dateValue = 19655; // 2023-10-25
-        IndexedRow row = createRowWithDate(dateValue);
+        GenericRow row = GenericRow.of(dateValue);
         IcebergKeyEncoder encoder =
                 new IcebergKeyEncoder(rowType, 
Collections.singletonList("date"));
 
         // Encode with our implementation
         byte[] ourEncoded = encoder.encodeKey(row);
 
-        // Encode with Iceberg's implementation
-        ByteBuffer icebergBuffer = 
Conversions.toByteBuffer(Types.DateType.get(), dateValue);
-        byte[] icebergEncoded = toByteArray(icebergBuffer);
-
-        assertThat(ourEncoded).isEqualTo(icebergEncoded);
+        // This is the equivalent bytes array which the key should be encoded 
to.
+        byte[] equivalentBytes = toBytes(dateValue);
+        assertThat(ourEncoded).isEqualTo(equivalentBytes);
     }
 
     @Test
@@ -209,18 +182,15 @@ class IcebergKeyEncoderTest {
         int timeMillis = 34200000;
         long timeMicros = timeMillis * 1000L; // Convert to microseconds for 
Iceberg
 
-        IndexedRow row = createRowWithTime(timeMillis);
+        GenericRow row = GenericRow.of(timeMillis);
+
         IcebergKeyEncoder encoder =
                 new IcebergKeyEncoder(rowType, 
Collections.singletonList("time"));
 
         // Encode with our implementation
         byte[] ourEncoded = encoder.encodeKey(row);
-
-        // Encode with Iceberg's implementation (expects microseconds as long)
-        ByteBuffer icebergBuffer = 
Conversions.toByteBuffer(Types.TimeType.get(), timeMicros);
-        byte[] icebergEncoded = toByteArray(icebergBuffer);
-
-        assertThat(ourEncoded).isEqualTo(icebergEncoded);
+        byte[] equivalentBytes = toBytes(timeMicros);
+        assertThat(ourEncoded).isEqualTo(equivalentBytes);
     }
 
     @Test
@@ -228,25 +198,13 @@ class IcebergKeyEncoderTest {
         RowType rowType = RowType.of(new DataType[] {DataTypes.BYTES()}, new 
String[] {"data"});
 
         byte[] testValue = "Hello i only understand binary data".getBytes();
-        IndexedRow row = createRowWithBytes(testValue);
+        GenericRow row = GenericRow.of(testValue);
         IcebergKeyEncoder encoder =
                 new IcebergKeyEncoder(rowType, 
Collections.singletonList("data"));
 
         // Encode with our implementation
         byte[] ourEncoded = encoder.encodeKey(row);
-
-        // Decode length prefix
-        int length = ByteBuffer.wrap(ourEncoded, 0, 
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
-        byte[] actualContent = Arrays.copyOfRange(ourEncoded, 4, 4 + length);
-
-        // Encode using Iceberg's Conversions (input should be ByteBuffer)
-        ByteBuffer icebergBuffer =
-                Conversions.toByteBuffer(Types.BinaryType.get(), 
ByteBuffer.wrap(testValue));
-        byte[] expectedContent = toByteArray(icebergBuffer);
-
-        // Validate length and content
-        assertThat(length).isEqualTo(expectedContent.length);
-        assertThat(actualContent).isEqualTo(expectedContent);
+        assertThat(ourEncoded).isEqualTo(testValue);
     }
 
     // Helper method to convert ByteBuffer to byte array
@@ -256,138 +214,7 @@ class IcebergKeyEncoderTest {
         return array;
     }
 
-    // ---- Helper methods to create IndexedRow instances ----
-
-    private IndexedRow createRowWithInt(int value) throws IOException {
-        DataType[] dataTypes = {DataTypes.INT()};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeInt(value);
-            IndexedRow row = new IndexedRow(dataTypes);
-            row.pointTo(writer.segment(), 0, writer.position());
-            return row;
-        }
-    }
-
-    private IndexedRow createRowWithLong(long value) throws IOException {
-        DataType[] dataTypes = {DataTypes.BIGINT()};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeLong(value);
-            IndexedRow row = new IndexedRow(dataTypes);
-            row.pointTo(writer.segment(), 0, writer.position());
-            return row;
-        }
-    }
-
-    private IndexedRow createRowWithString(String value) throws IOException {
-        DataType[] dataTypes = {DataTypes.STRING()};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeString(BinaryString.fromString(value));
-            IndexedRow row = new IndexedRow(dataTypes);
-            row.pointTo(writer.segment(), 0, writer.position());
-            return row;
-        }
-    }
-
-    private IndexedRow createRowWithBoolean(boolean value) throws IOException {
-        DataType[] dataTypes = {DataTypes.BOOLEAN()};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeBoolean(value);
-            IndexedRow row = new IndexedRow(dataTypes);
-            row.pointTo(writer.segment(), 0, writer.position());
-            return row;
-        }
-    }
-
-    private IndexedRow createRowWithFloat(float value) throws IOException {
-        DataType[] dataTypes = {DataTypes.FLOAT()};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeFloat(value);
-            IndexedRow row = new IndexedRow(dataTypes);
-            row.pointTo(writer.segment(), 0, writer.position());
-            return row;
-        }
-    }
-
-    private IndexedRow createRowWithDouble(double value) throws IOException {
-        DataType[] dataTypes = {DataTypes.DOUBLE()};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeDouble(value);
-            IndexedRow row = new IndexedRow(dataTypes);
-            row.pointTo(writer.segment(), 0, writer.position());
-            return row;
-        }
-    }
-
-    private IndexedRow createRowWithDecimal(BigDecimal value, int precision, 
int scale)
-            throws IOException {
-        DataType[] dataTypes = {DataTypes.DECIMAL(precision, scale)};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeDecimal(Decimal.fromBigDecimal(value, precision, 
scale), precision);
-            IndexedRow row = new IndexedRow(dataTypes);
-            row.pointTo(writer.segment(), 0, writer.position());
-            return row;
-        }
-    }
-
-    private IndexedRow createRowWithTimestampNtz(long millis, int nanos) 
throws IOException {
-        DataType[] dataTypes = {DataTypes.TIMESTAMP(6)};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeTimestampNtz(TimestampNtz.fromMillis(millis, nanos), 
6);
-            IndexedRow row = new IndexedRow(dataTypes);
-            row.pointTo(writer.segment(), 0, writer.position());
-            return row;
-        }
-    }
-
-    private IndexedRow createRowWithTimestampLtz(long millis, int nanos) 
throws IOException {
-        DataType[] dataTypes = {DataTypes.TIMESTAMP_LTZ(6)};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(millis, 
nanos), 6);
-            IndexedRow row = new IndexedRow(dataTypes);
-            row.pointTo(writer.segment(), 0, writer.position());
-            return row;
-        }
-    }
-
-    private IndexedRow createRowWithDate(int days) throws IOException {
-        DataType[] dataTypes = {DataTypes.DATE()};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeInt(days);
-            IndexedRow row = new IndexedRow(dataTypes);
-            row.pointTo(writer.segment(), 0, writer.position());
-            return row;
-        }
-    }
-
-    private IndexedRow createRowWithTime(int millis) throws IOException {
-        DataType[] dataTypes = {DataTypes.TIME()};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeInt(millis); // Fluss stores TIME as int (milliseconds)
-            IndexedRow row = new IndexedRow(dataTypes);
-            row.pointTo(writer.segment(), 0, writer.position());
-            writer.close();
-            return row;
-        }
-    }
-
-    private IndexedRow createRowWithBytes(byte[] value) throws IOException {
-        DataType[] dataTypes = {DataTypes.BYTES()};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeBytes(value);
-            IndexedRow row = new IndexedRow(dataTypes);
-            row.pointTo(writer.segment(), 0, writer.position());
-            return row;
-        }
-    }
-
-    private IndexedRow createRowWithTimestampNtz(long millis, int nanos, 
DataType type)
-            throws IOException {
-        DataType[] dataTypes = {DataTypes.BYTES()};
-        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
-            writer.writeTimestampNtz(TimestampNtz.fromMillis(millis, nanos), 
getPrecision(type));
-            IndexedRow row = new IndexedRow(new DataType[] {type});
-            row.pointTo(writer.segment(), 0, writer.position());
-            return row;
-        }
+    private byte[] toBytes(long value) {
+        return 
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(value).array();
     }
 }


Reply via email to