This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new b492c290b [lake/iceberg] Iceberg encoding strategy (#1350)
b492c290b is described below

commit b492c290ba98f5545db75eecea7427fb100ed776
Author: MehulBatra <[email protected]>
AuthorDate: Tue Jul 22 16:28:32 2025 +0530

    [lake/iceberg] Iceberg encoding strategy (#1350)
    
    ---------
    Co-authored-by: luoyuxia <[email protected]>
---
 fluss-common/pom.xml                               |   6 +
 .../com/alibaba/fluss/metadata/DataLakeFormat.java |   3 +-
 .../com/alibaba/fluss/row/encode/KeyEncoder.java   |   3 +
 .../row/encode/iceberg/IcebergBinaryRowWriter.java | 223 ++++++++++++
 .../row/encode/iceberg/IcebergKeyEncoder.java      |  70 ++++
 .../row/encode/iceberg/IcebergKeyEncoderTest.java  | 393 +++++++++++++++++++++
 fluss-lake/fluss-lake-iceberg/pom.xml              |  20 +-
 .../src/main/resources/META-INF/NOTICE             |   2 +-
 fluss-test-coverage/pom.xml                        |  37 +-
 pom.xml                                            |   4 +-
 10 files changed, 736 insertions(+), 25 deletions(-)

diff --git a/fluss-common/pom.xml b/fluss-common/pom.xml
index 181246794..e6cf4922c 100644
--- a/fluss-common/pom.xml
+++ b/fluss-common/pom.xml
@@ -103,6 +103,12 @@
             <version>${paimon.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-api</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/metadata/DataLakeFormat.java 
b/fluss-common/src/main/java/com/alibaba/fluss/metadata/DataLakeFormat.java
index 624aa1267..b014ee814 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/DataLakeFormat.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/DataLakeFormat.java
@@ -19,7 +19,8 @@ package com.alibaba.fluss.metadata;
 
 /** An enum for datalake format. */
 public enum DataLakeFormat {
-    PAIMON("paimon");
+    PAIMON("paimon"),
+    ICEBERG("iceberg");
 
     private final String value;
 
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java 
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java
index 044c8a319..da3306455 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java
@@ -19,6 +19,7 @@ package com.alibaba.fluss.row.encode;
 
 import com.alibaba.fluss.metadata.DataLakeFormat;
 import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.encode.iceberg.IcebergKeyEncoder;
 import com.alibaba.fluss.row.encode.paimon.PaimonKeyEncoder;
 import com.alibaba.fluss.types.RowType;
 
@@ -46,6 +47,8 @@ public interface KeyEncoder {
             return CompactedKeyEncoder.createKeyEncoder(rowType, keyFields);
         } else if (lakeFormat == DataLakeFormat.PAIMON) {
             return new PaimonKeyEncoder(rowType, keyFields);
+        } else if (lakeFormat == DataLakeFormat.ICEBERG) {
+            return new IcebergKeyEncoder(rowType, keyFields);
         } else {
             throw new UnsupportedOperationException("Unsupported datalake 
format: " + lakeFormat);
         }
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
new file mode 100644
index 000000000..b18a9bfa0
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.row.encode.iceberg;
+
+import com.alibaba.fluss.memory.MemorySegment;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.utils.UnsafeUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static com.alibaba.fluss.types.DataTypeChecks.getPrecision;
+
+/**
+ * A writer to encode Fluss's {@link com.alibaba.fluss.row.InternalRow} using 
Iceberg's binary
+ * encoding format.
+ *
+ * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() 
implementation:
+ * 
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java
+ *
+ * <p>Key encoding principles from Iceberg's Conversions class:
+ *
+ * <ul>
+ *   <li>All numeric types (int, long, float, double, timestamps) use 
LITTLE-ENDIAN byte order
+ *   <li>Decimal types use BIG-ENDIAN byte order
+ *   <li>Strings are encoded as UTF-8 bytes
+ *   <li>Timestamps are stored as long values (microseconds since epoch)
+ * </ul>
+ *
+ * <p>Note: This implementation uses Fluss's MemorySegment instead of 
ByteBuffer for performance,
+ * but maintains byte-level compatibility with Iceberg's encoding.
+ */
+class IcebergBinaryRowWriter {
+
+    private final int arity;
+    private byte[] buffer;
+    private MemorySegment segment;
+    private int cursor;
+
+    public IcebergBinaryRowWriter(int arity) {
+        this.arity = arity;
+        // Conservative initial size to avoid frequent resizing
+        int initialSize = 8 + (arity * 8);
+        setBuffer(new byte[initialSize]);
+        reset();
+    }
+
+    public void reset() {
+        this.cursor = 0;
+        // Clear only the used portion for efficiency
+        if (cursor > 0) {
+            Arrays.fill(buffer, 0, Math.min(cursor, buffer.length), (byte) 0);
+        }
+    }
+
+    public byte[] toBytes() {
+        byte[] result = new byte[cursor];
+        System.arraycopy(buffer, 0, result, 0, cursor);
+        return result;
+    }
+
+    public void setNullAt(int pos) {
+        // For Iceberg key encoding, null values should not occur
+        // This is validated at the encoder level
+        throw new UnsupportedOperationException(
+                "Null values are not supported in Iceberg key encoding");
+    }
+
+    public void writeBoolean(boolean value) {
+        ensureCapacity(1);
+        UnsafeUtils.putBoolean(buffer, cursor, value);
+        cursor += 1;
+    }
+
+    public void writeByte(byte value) {
+        ensureCapacity(1);
+        UnsafeUtils.putByte(buffer, cursor, value);
+        cursor += 1;
+    }
+
+    public void writeShort(short value) {
+        ensureCapacity(2);
+        UnsafeUtils.putShort(buffer, cursor, value);
+        cursor += 2;
+    }
+
+    public void writeInt(int value) {
+        ensureCapacity(4);
+        UnsafeUtils.putInt(buffer, cursor, value);
+        cursor += 4;
+    }
+
+    public void writeLong(long value) {
+        ensureCapacity(8);
+        UnsafeUtils.putLong(buffer, cursor, value);
+        cursor += 8;
+    }
+
+    public void writeFloat(float value) {
+        ensureCapacity(4);
+        UnsafeUtils.putFloat(buffer, cursor, value);
+        cursor += 4;
+    }
+
+    public void writeDouble(double value) {
+        ensureCapacity(8);
+        UnsafeUtils.putDouble(buffer, cursor, value);
+        cursor += 8;
+    }
+
+    public void writeString(BinaryString value) {
+        // Convert to UTF-8 byte array
+        byte[] bytes = BinaryString.encodeUTF8(value.toString());
+        // Write length prefix followed by UTF-8 bytes
+        writeInt(bytes.length); // 4-byte length prefix
+        ensureCapacity(bytes.length); // Ensure space for actual string bytes
+        segment.put(cursor, bytes, 0, bytes.length);
+        cursor += bytes.length;
+    }
+
+    public void writeBytes(byte[] bytes) {
+        // Write length prefix followed by binary data
+        writeInt(bytes.length); // 4-byte length prefix
+        ensureCapacity(bytes.length); // Ensure space for actual binary bytes
+        segment.put(cursor, bytes, 0, bytes.length);
+        cursor += bytes.length;
+    }
+
+    public void writeDecimal(Decimal value, int precision) {
+        byte[] unscaled = value.toUnscaledBytes();
+        writeBytes(unscaled); // Adds 4-byte length prefix before the actual 
bytes
+    }
+
+    private void ensureCapacity(int neededSize) {
+        if (buffer.length < cursor + neededSize) {
+            grow(cursor + neededSize);
+        }
+    }
+
+    private void grow(int minCapacity) {
+        int oldCapacity = buffer.length;
+        int newCapacity = oldCapacity + (oldCapacity >> 1); // 1.5x growth
+        if (newCapacity < minCapacity) {
+            newCapacity = minCapacity;
+        }
+        setBuffer(Arrays.copyOf(buffer, newCapacity));
+    }
+
+    private void setBuffer(byte[] buffer) {
+        this.buffer = buffer;
+        this.segment = MemorySegment.wrap(buffer);
+    }
+
+    /**
+     * Creates an accessor for writing the elements of an iceberg binary row 
writer during runtime.
+     *
+     * @param fieldType the field type to write
+     */
+    public static FieldWriter createFieldWriter(DataType fieldType) {
+        switch (fieldType.getTypeRoot()) {
+            case INTEGER:
+            case DATE:
+                return (writer, value) -> writer.writeInt((int) value);
+
+            case TIME_WITHOUT_TIME_ZONE:
+                // Write time as microseconds long (milliseconds * 1000)
+                return (writer, value) -> {
+                    int millis = (int) value;
+                    long micros = millis * 1000L;
+                    writer.writeLong(micros);
+                };
+
+            case BIGINT:
+                return (writer, value) -> writer.writeLong((long) value);
+                // support for nanoseconds come check again after #1195 merge
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return (writer, value) -> {
+                    TimestampNtz ts = (TimestampNtz) value;
+                    long micros = ts.getMillisecond() * 1000L + 
(ts.getNanoOfMillisecond() / 1000L);
+                    writer.writeLong(micros);
+                };
+
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(fieldType);
+                return (writer, value) -> writer.writeDecimal((Decimal) value, 
decimalPrecision);
+
+            case STRING:
+            case CHAR:
+                return (writer, value) -> writer.writeString((BinaryString) 
value);
+
+            case BINARY:
+            case BYTES:
+                return (writer, value) -> writer.writeBytes((byte[]) value);
+
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported type for Iceberg binary row writer: " + 
fieldType);
+        }
+    }
+
+    /** Accessor for writing the elements of an iceberg binary row writer 
during runtime. */
+    interface FieldWriter extends Serializable {
+        void writeField(IcebergBinaryRowWriter writer, Object value);
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoder.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoder.java
new file mode 100644
index 000000000..e8df7a0af
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.row.encode.iceberg;
+
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.encode.KeyEncoder;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.RowType;
+
+import java.util.List;
+
+import static com.alibaba.fluss.utils.Preconditions.checkArgument;
+
+/** An implementation of {@link KeyEncoder} to follow Iceberg's encoding 
strategy. */
+public class IcebergKeyEncoder implements KeyEncoder {
+
+    private final InternalRow.FieldGetter[] fieldGetters;
+
+    private final IcebergBinaryRowWriter.FieldWriter[] fieldEncoders;
+
+    private final IcebergBinaryRowWriter icebergBinaryRowWriter;
+
+    public IcebergKeyEncoder(RowType rowType, List<String> keys) {
+
+        // Validate single key field requirement as per FIP
+        checkArgument(
+                keys.size() == 1,
+                "Key fields must have exactly one field for iceberg format, 
but got: %s",
+                keys);
+
+        // for get fields from fluss internal row
+        fieldGetters = new InternalRow.FieldGetter[keys.size()];
+        // for encode fields into iceberg
+        fieldEncoders = new IcebergBinaryRowWriter.FieldWriter[keys.size()];
+        for (int i = 0; i < keys.size(); i++) {
+            int keyIndex = rowType.getFieldIndex(keys.get(i));
+            DataType keyDataType = rowType.getTypeAt(keyIndex);
+            fieldGetters[i] = InternalRow.createFieldGetter(keyDataType, 
keyIndex);
+            fieldEncoders[i] = 
IcebergBinaryRowWriter.createFieldWriter(keyDataType);
+        }
+
+        icebergBinaryRowWriter = new IcebergBinaryRowWriter(keys.size());
+    }
+
+    @Override
+    public byte[] encodeKey(InternalRow row) {
+        icebergBinaryRowWriter.reset();
+        // iterate all the fields of the row, and encode each field
+        for (int i = 0; i < fieldGetters.length; i++) {
+            fieldEncoders[i].writeField(
+                    icebergBinaryRowWriter, 
fieldGetters[i].getFieldOrNull(row));
+        }
+        return icebergBinaryRowWriter.toBytes();
+    }
+}
diff --git 
a/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
 
b/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
new file mode 100644
index 000000000..2934f85e8
--- /dev/null
+++ 
b/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.row.encode.iceberg;
+
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.row.indexed.IndexedRow;
+import com.alibaba.fluss.row.indexed.IndexedRowWriter;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static com.alibaba.fluss.types.DataTypeChecks.getPrecision;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link IcebergKeyEncoder} to verify the encoding matches 
Iceberg's format.
+ *
+ * <p>This test uses Iceberg's actual Conversions class to ensure our encoding 
is byte-for-byte
+ * compatible with Iceberg's implementation.
+ */
+class IcebergKeyEncoderTest {
+
+    @Test
+    void testSingleKeyFieldRequirement() {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.STRING()},
+                        new String[] {"id", "name"});
+
+        // Should succeed with single key
+        IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, 
Collections.singletonList("id"));
+        assertThat(encoder).isNotNull();
+
+        // Should fail with multiple keys
+        assertThatThrownBy(() -> new IcebergKeyEncoder(rowType, 
Arrays.asList("id", "name")))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Key fields must have exactly one 
field");
+    }
+
+    @Test
+    void testIntegerEncoding() throws IOException {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.INT()}, new 
String[] {"id"});
+
+        int testValue = 42;
+        IndexedRow row = createRowWithInt(testValue);
+        IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, 
Collections.singletonList("id"));
+
+        // Encode with our implementation
+        byte[] ourEncoded = encoder.encodeKey(row);
+
+        // Encode with Iceberg's implementation
+        ByteBuffer icebergBuffer = 
Conversions.toByteBuffer(Types.IntegerType.get(), testValue);
+        byte[] icebergEncoded = toByteArray(icebergBuffer);
+
+        assertThat(ourEncoded).isEqualTo(icebergEncoded);
+    }
+
+    @Test
+    void testLongEncoding() throws IOException {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.BIGINT()}, new 
String[] {"id"});
+
+        long testValue = 1234567890123456789L;
+        IndexedRow row = createRowWithLong(testValue);
+        IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, 
Collections.singletonList("id"));
+
+        // Encode with our implementation
+        byte[] ourEncoded = encoder.encodeKey(row);
+
+        // Encode with Iceberg's implementation
+        ByteBuffer icebergBuffer = 
Conversions.toByteBuffer(Types.LongType.get(), testValue);
+        byte[] icebergEncoded = toByteArray(icebergBuffer);
+
+        assertThat(ourEncoded).isEqualTo(icebergEncoded);
+    }
+
+    @Test
+    void testStringEncoding() throws IOException {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new 
String[] {"name"});
+
+        String testValue = "Hello Iceberg, Fluss this side!";
+        IndexedRow row = createRowWithString(testValue);
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("name"));
+
+        // Encode with our implementation
+        byte[] ourEncoded = encoder.encodeKey(row);
+
+        // Decode length prefix
+        int length = ByteBuffer.wrap(ourEncoded, 0, 
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+        byte[] actualContent = Arrays.copyOfRange(ourEncoded, 4, 4 + length);
+
+        // Encode with Iceberg's Conversions
+        byte[] expectedContent =
+                toByteArray(Conversions.toByteBuffer(Types.StringType.get(), 
testValue));
+
+        // Validate length and content
+        assertThat(length).isEqualTo(expectedContent.length);
+        assertThat(actualContent).isEqualTo(expectedContent);
+    }
+
+    @Test
+    void testDecimalEncoding() throws IOException {
+        RowType rowType =
+                RowType.of(new DataType[] {DataTypes.DECIMAL(10, 2)}, new 
String[] {"amount"});
+
+        BigDecimal testValue = new BigDecimal("123.45");
+        IndexedRow row = createRowWithDecimal(testValue, 10, 2);
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("amount"));
+
+        // Encode with our implementation
+        byte[] ourEncoded = encoder.encodeKey(row);
+
+        // Extract the decimal length prefix and bytes from ourEncoded
+        int length = ByteBuffer.wrap(ourEncoded, 0, 
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+        byte[] actualDecimal = Arrays.copyOfRange(ourEncoded, 4, 4 + length);
+
+        // Encode the same value with Iceberg's implementation (no prefix)
+        Type.PrimitiveType decimalType = Types.DecimalType.of(10, 2);
+        ByteBuffer icebergBuffer = Conversions.toByteBuffer(decimalType, 
testValue);
+        byte[] icebergEncoded = toByteArray(icebergBuffer);
+
+        // Validate that our content (excluding the prefix) matches Iceberg's 
encoding
+        assertThat(length).isEqualTo(icebergEncoded.length);
+        assertThat(actualDecimal).isEqualTo(icebergEncoded);
+    }
+
+    @Test
+    void testTimestampEncoding() throws IOException {
+        RowType rowType =
+                RowType.of(new DataType[] {DataTypes.TIMESTAMP(6)}, new 
String[] {"event_time"});
+
+        // Iceberg expects microseconds for TIMESTAMP type
+        long millis = 1698235273182L;
+        int nanos = 123456;
+        long micros = millis * 1000 + (nanos / 1000);
+
+        IndexedRow row = createRowWithTimestampNtz(millis, nanos);
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("event_time"));
+
+        // Encode with our implementation
+        byte[] ourEncoded = encoder.encodeKey(row);
+
+        // Encode with Iceberg's implementation
+        ByteBuffer icebergBuffer =
+                Conversions.toByteBuffer(Types.TimestampType.withoutZone(), 
micros);
+        byte[] icebergEncoded = toByteArray(icebergBuffer);
+
+        assertThat(ourEncoded).isEqualTo(icebergEncoded);
+    }
+
+    @Test
+    void testDateEncoding() throws IOException {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.DATE()}, new 
String[] {"date"});
+
+        // Date value as days since epoch
+        int dateValue = 19655; // 2023-10-25
+        IndexedRow row = createRowWithDate(dateValue);
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("date"));
+
+        // Encode with our implementation
+        byte[] ourEncoded = encoder.encodeKey(row);
+
+        // Encode with Iceberg's implementation
+        ByteBuffer icebergBuffer = 
Conversions.toByteBuffer(Types.DateType.get(), dateValue);
+        byte[] icebergEncoded = toByteArray(icebergBuffer);
+
+        assertThat(ourEncoded).isEqualTo(icebergEncoded);
+    }
+
+    @Test
+    void testTimeEncoding() throws IOException {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.TIME()}, new 
String[] {"time"});
+
+        // Fluss stores time as int (milliseconds since midnight)
+        int timeMillis = 34200000;
+        long timeMicros = timeMillis * 1000L; // Convert to microseconds for 
Iceberg
+
+        IndexedRow row = createRowWithTime(timeMillis);
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("time"));
+
+        // Encode with our implementation
+        byte[] ourEncoded = encoder.encodeKey(row);
+
+        // Encode with Iceberg's implementation (expects microseconds as long)
+        ByteBuffer icebergBuffer = 
Conversions.toByteBuffer(Types.TimeType.get(), timeMicros);
+        byte[] icebergEncoded = toByteArray(icebergBuffer);
+
+        assertThat(ourEncoded).isEqualTo(icebergEncoded);
+    }
+
+    @Test
+    void testBinaryEncoding() throws IOException {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.BYTES()}, new 
String[] {"data"});
+
+        byte[] testValue = "Hello i only understand binary data".getBytes();
+        IndexedRow row = createRowWithBytes(testValue);
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("data"));
+
+        // Encode with our implementation
+        byte[] ourEncoded = encoder.encodeKey(row);
+
+        // Decode length prefix
+        int length = ByteBuffer.wrap(ourEncoded, 0, 
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+        byte[] actualContent = Arrays.copyOfRange(ourEncoded, 4, 4 + length);
+
+        // Encode using Iceberg's Conversions (input should be ByteBuffer)
+        ByteBuffer icebergBuffer =
+                Conversions.toByteBuffer(Types.BinaryType.get(), 
ByteBuffer.wrap(testValue));
+        byte[] expectedContent = toByteArray(icebergBuffer);
+
+        // Validate length and content
+        assertThat(length).isEqualTo(expectedContent.length);
+        assertThat(actualContent).isEqualTo(expectedContent);
+    }
+
+    // Helper method to convert ByteBuffer to byte array
+    private byte[] toByteArray(ByteBuffer buffer) {
+        byte[] array = new byte[buffer.remaining()];
+        buffer.get(array);
+        return array;
+    }
+
+    // ---- Helper methods to create IndexedRow instances ----
+
+    private IndexedRow createRowWithInt(int value) throws IOException {
+        DataType[] dataTypes = {DataTypes.INT()};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeInt(value);
+            IndexedRow row = new IndexedRow(dataTypes);
+            row.pointTo(writer.segment(), 0, writer.position());
+            return row;
+        }
+    }
+
+    private IndexedRow createRowWithLong(long value) throws IOException {
+        DataType[] dataTypes = {DataTypes.BIGINT()};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeLong(value);
+            IndexedRow row = new IndexedRow(dataTypes);
+            row.pointTo(writer.segment(), 0, writer.position());
+            return row;
+        }
+    }
+
+    private IndexedRow createRowWithString(String value) throws IOException {
+        DataType[] dataTypes = {DataTypes.STRING()};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeString(BinaryString.fromString(value));
+            IndexedRow row = new IndexedRow(dataTypes);
+            row.pointTo(writer.segment(), 0, writer.position());
+            return row;
+        }
+    }
+
+    private IndexedRow createRowWithBoolean(boolean value) throws IOException {
+        DataType[] dataTypes = {DataTypes.BOOLEAN()};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeBoolean(value);
+            IndexedRow row = new IndexedRow(dataTypes);
+            row.pointTo(writer.segment(), 0, writer.position());
+            return row;
+        }
+    }
+
+    private IndexedRow createRowWithFloat(float value) throws IOException {
+        DataType[] dataTypes = {DataTypes.FLOAT()};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeFloat(value);
+            IndexedRow row = new IndexedRow(dataTypes);
+            row.pointTo(writer.segment(), 0, writer.position());
+            return row;
+        }
+    }
+
+    private IndexedRow createRowWithDouble(double value) throws IOException {
+        DataType[] dataTypes = {DataTypes.DOUBLE()};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeDouble(value);
+            IndexedRow row = new IndexedRow(dataTypes);
+            row.pointTo(writer.segment(), 0, writer.position());
+            return row;
+        }
+    }
+
+    private IndexedRow createRowWithDecimal(BigDecimal value, int precision, 
int scale)
+            throws IOException {
+        DataType[] dataTypes = {DataTypes.DECIMAL(precision, scale)};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeDecimal(Decimal.fromBigDecimal(value, precision, 
scale), precision);
+            IndexedRow row = new IndexedRow(dataTypes);
+            row.pointTo(writer.segment(), 0, writer.position());
+            return row;
+        }
+    }
+
+    private IndexedRow createRowWithTimestampNtz(long millis, int nanos) 
throws IOException {
+        DataType[] dataTypes = {DataTypes.TIMESTAMP(6)};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeTimestampNtz(TimestampNtz.fromMillis(millis, nanos), 
6);
+            IndexedRow row = new IndexedRow(dataTypes);
+            row.pointTo(writer.segment(), 0, writer.position());
+            return row;
+        }
+    }
+
+    private IndexedRow createRowWithTimestampLtz(long millis, int nanos) 
throws IOException {
+        DataType[] dataTypes = {DataTypes.TIMESTAMP_LTZ(6)};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(millis, 
nanos), 6);
+            IndexedRow row = new IndexedRow(dataTypes);
+            row.pointTo(writer.segment(), 0, writer.position());
+            return row;
+        }
+    }
+
+    private IndexedRow createRowWithDate(int days) throws IOException {
+        DataType[] dataTypes = {DataTypes.DATE()};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeInt(days);
+            IndexedRow row = new IndexedRow(dataTypes);
+            row.pointTo(writer.segment(), 0, writer.position());
+            return row;
+        }
+    }
+
+    private IndexedRow createRowWithTime(int millis) throws IOException {
+        DataType[] dataTypes = {DataTypes.TIME()};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeInt(millis); // Fluss stores TIME as int (milliseconds)
+            IndexedRow row = new IndexedRow(dataTypes);
+            row.pointTo(writer.segment(), 0, writer.position());
+            writer.close();
+            return row;
+        }
+    }
+
+    private IndexedRow createRowWithBytes(byte[] value) throws IOException {
+        DataType[] dataTypes = {DataTypes.BYTES()};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeBytes(value);
+            IndexedRow row = new IndexedRow(dataTypes);
+            row.pointTo(writer.segment(), 0, writer.position());
+            return row;
+        }
+    }
+
+    private IndexedRow createRowWithTimestampNtz(long millis, int nanos, 
DataType type)
+            throws IOException {
+        DataType[] dataTypes = {DataTypes.BYTES()};
+        try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+            writer.writeTimestampNtz(TimestampNtz.fromMillis(millis, nanos), 
getPrecision(type));
+            IndexedRow row = new IndexedRow(new DataType[] {type});
+            row.pointTo(writer.segment(), 0, writer.position());
+            return row;
+        }
+    }
+}
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml 
b/fluss-lake/fluss-lake-iceberg/pom.xml
index 4218a4b25..2869bb799 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -32,10 +32,6 @@
 
     <packaging>jar</packaging>
 
-    <properties>
-        <iceberg.version>1.9.1</iceberg.version>
-    </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.iceberg</groupId>
@@ -69,14 +65,14 @@
                                 </includes>
                             </artifactSet>
                             <filters>
-                                 <filter>
-                                     <artifact>*</artifact>
-                                     <excludes>
-                                         <exclude>LICENSE</exclude>
-                                         <exclude>NOTICE</exclude>
-                                     </excludes>
-                                 </filter>
-                             </filters>
+                                <filter>
+                                    <artifact>*</artifact>
+                                    <excludes>
+                                        <exclude>LICENSE</exclude>
+                                        <exclude>NOTICE</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
                         </configuration>
                     </execution>
                 </executions>
diff --git a/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE 
b/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
index 71f037746..20b34f83d 100644
--- a/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
+++ b/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
@@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.iceberg:iceberg-core:1.9.1
+- org.apache.iceberg:iceberg-core:1.4.3
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index a661722e6..97c35e6eb 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -246,6 +246,7 @@
                                             
com.alibaba.fluss.row.columnar.BytesColumnVector.Bytes
                                         </exclude>
                                         
<exclude>com.alibaba.fluss.row.encode.RowEncoder</exclude>
+                                        
<exclude>com.alibaba.fluss.row.encode.KeyEncoder</exclude>
                                         
<exclude>com.alibaba.fluss.table.*</exclude>
                                         
<exclude>com.alibaba.fluss.record.*</exclude>
                                         
<exclude>com.alibaba.fluss.kv.*</exclude>
@@ -269,7 +270,9 @@
                                         
<exclude>com.alibaba.fluss.Bucket</exclude>
                                         
<exclude>com.alibaba.fluss.remote.*</exclude>
                                         
<exclude>com.alibaba.fluss.compression.*</exclude>
-                                        
<exclude>com.alibaba.fluss.security.auth.sasl.plain.PlainSaslServer.PlainSaslServerFactory</exclude>
+                                        <exclude>
+                                            
com.alibaba.fluss.security.auth.sasl.plain.PlainSaslServer.PlainSaslServerFactory
+                                        </exclude>
                                         
<exclude>com.alibaba.fluss.security.auth.ServerAuthenticator</exclude>
                                         <!-- start exclude for flink-connector 
-->
                                         
<exclude>com.alibaba.fluss.flink.utils.*</exclude>
@@ -307,7 +310,8 @@
                                         
<exclude>com.alibaba.fluss.fs.oss.*</exclude>
                                         
<exclude>com.alibaba.fluss.fs.s3.*</exclude>
                                         
<exclude>com.alibaba.fluss.fs.obs.*</exclude>
-                                        
<exclude>com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser*</exclude>
+                                        
<exclude>com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser*
+                                        </exclude>
                                         
<exclude>com.alibaba.fluss.rocksdb.RocksIteratorWrapper
                                         </exclude>
                                         
<exclude>com.alibaba.fluss.plugin.PluginUtils</exclude>
@@ -331,15 +335,28 @@
                                         
<exclude>com.alibaba.fluss.flink.tiering.source.TieringSourceOptions</exclude>
                                         
<exclude>com.alibaba.fluss.flink.tiering.source.TieringSource.Builder</exclude>
                                         
<exclude>com.alibaba.fluss.flink.tiering.source.TieringSource</exclude>
-                                        
<exclude>com.alibaba.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator</exclude>
-                                        
<exclude>com.alibaba.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator.HeartBeatHelper</exclude>
-                                        
<exclude>com.alibaba.fluss.flink.tiering.source.TieringWriterInitContext</exclude>
+                                        <exclude>
+                                            
com.alibaba.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator
+                                        </exclude>
+                                        <exclude>
+                                            
com.alibaba.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator.HeartBeatHelper
+                                        </exclude>
+                                        
<exclude>com.alibaba.fluss.flink.tiering.source.TieringWriterInitContext
+                                        </exclude>
                                         
<exclude>com.alibaba.fluss.flink.tiering.source.TieringSourceReader</exclude>
-                                        
<exclude>com.alibaba.fluss.flink.tiering.source.TableBucketWriteResultEmitter</exclude>
-                                        
<exclude>com.alibaba.fluss.flink.tiering.source.TableBucketWriteResultTypeInfo*</exclude>
-                                        
<exclude>com.alibaba.fluss.flink.tiering.committer.TieringCommitOperatorFactory</exclude>
-                                        
<exclude>com.alibaba.fluss.flink.tiering.committer.CommittableMessageTypeInfo*</exclude>
-                                        
<exclude>com.alibaba.fluss.flink.tiering.committer.LakeTieringCommitOperatorFactory</exclude>
+                                        
<exclude>com.alibaba.fluss.flink.tiering.source.TableBucketWriteResultEmitter
+                                        </exclude>
+                                        <exclude>
+                                            
com.alibaba.fluss.flink.tiering.source.TableBucketWriteResultTypeInfo*
+                                        </exclude>
+                                        <exclude>
+                                            
com.alibaba.fluss.flink.tiering.committer.TieringCommitOperatorFactory
+                                        </exclude>
+                                        
<exclude>com.alibaba.fluss.flink.tiering.committer.CommittableMessageTypeInfo*
+                                        </exclude>
+                                        <exclude>
+                                            
com.alibaba.fluss.flink.tiering.committer.LakeTieringCommitOperatorFactory
+                                        </exclude>
                                         
<exclude>com.alibaba.fluss.flink.tiering.FlussLakeTieringEntrypoint</exclude>
                                         <!-- end exclude for flink tiering 
service -->
                                     </excludes>
diff --git a/pom.xml b/pom.xml
index 87af19cfc..283239dcd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,6 +99,8 @@
         <netty.version>4.1.104</netty.version>
         <arrow.version>15.0.0</arrow.version>
         <paimon.version>1.2.0</paimon.version>
+        <!--    todo: Revisit to add support as per Iceberg 1.9.1 post #1195 
merge for Java 11 support-->
+        <iceberg.version>1.4.3</iceberg.version>
 
         <fluss.hadoop.version>2.10.2</fluss.hadoop.version>
         <frocksdb.version>6.20.3-ververica-2.0</frocksdb.version>
@@ -550,7 +552,7 @@
                             under the License.
                         -->
                         <license 
implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
-                            <licenseFamilyCategory>AL2 </licenseFamilyCategory>
+                            <licenseFamilyCategory>AL2</licenseFamilyCategory>
                             <licenseFamilyName>Apache License 
2.0</licenseFamilyName>
                             <patterns>
                                 <pattern>Licensed to the Apache Software 
Foundation (ASF) under one</pattern>


Reply via email to