This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a7933c8e [lake/iceberg] Introduce IcebergRecordAsFlussRow for iceberg 
union read (#1672)
5a7933c8e is described below

commit 5a7933c8e5d35ef820892c4dfd71245fe3791c9c
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Sep 10 16:02:12 2025 +0800

    [lake/iceberg] Introduce IcebergRecordAsFlussRow for iceberg union read 
(#1672)
---
 fluss-lake/fluss-lake-iceberg/pom.xml              |   3 +-
 .../fluss/lake/iceberg/IcebergLakeCatalog.java     |   2 +-
 .../iceberg/source/IcebergRecordAsFlussRow.java    | 149 ++++++++++++++++++++
 .../source/IcebergRecordAsFlussRowTest.java        | 150 +++++++++++++++++++++
 .../fluss/lake/paimon/PaimonLakeCatalog.java       |   2 +-
 .../paimon/tiering/FlussRecordAsPaimonRow.java     |   7 +-
 .../lake/paimon/utils/PaimonRowAsFlussRow.java     |   4 +-
 7 files changed, 309 insertions(+), 8 deletions(-)

diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml 
b/fluss-lake/fluss-lake-iceberg/pom.xml
index f80cec66b..b5fe686b0 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -68,13 +68,14 @@
             <version>${iceberg.version}</version>
         </dependency>
 
-        <!-- test dependency -->
         <dependency>
             <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-common</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+
+        <!-- test dependency -->
         <dependency>
             <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-flink-common</artifactId>
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 25b101608..1f3a2b8cb 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -56,7 +56,7 @@ public class IcebergLakeCatalog implements LakeCatalog {
 
     public static final String ICEBERG_CATALOG_DEFAULT_NAME = 
"fluss-iceberg-catalog";
 
-    private static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new 
LinkedHashMap<>();
+    public static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new 
LinkedHashMap<>();
 
     static {
         // We need __bucket system column to filter out the given bucket
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
new file mode 100644
index 000000000..da9c564e1
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.utils.BytesUtils;
+
+import org.apache.iceberg.data.Record;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+
+import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
+
+/** Adapter for Iceberg Record as fluss row. */
+public class IcebergRecordAsFlussRow implements InternalRow {
+
+    private Record icebergRecord;
+
+    public IcebergRecordAsFlussRow() {}
+
+    public void setIcebergRecord(Record icebergRecord) {
+        this.icebergRecord = icebergRecord;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return icebergRecord.struct().fields().size() - SYSTEM_COLUMNS.size();
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return icebergRecord.get(pos) == null;
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return (boolean) icebergRecord.get(pos);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        Object value = icebergRecord.get(pos);
+        // Iceberg stores TINYINT as Integer, need to cast to byte
+        return ((Integer) value).byteValue();
+    }
+
+    @Override
+    public short getShort(int pos) {
+        Object value = icebergRecord.get(pos);
+        // Iceberg stores SMALLINT as Integer, need to cast to short
+        return ((Integer) value).shortValue();
+    }
+
+    @Override
+    public int getInt(int pos) {
+        Object value = icebergRecord.get(pos);
+        return (Integer) value;
+    }
+
+    @Override
+    public long getLong(int pos) {
+        Object value = icebergRecord.get(pos);
+        return (Long) value;
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        Object value = icebergRecord.get(pos);
+        return (float) value;
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        Object value = icebergRecord.get(pos);
+        return (double) value;
+    }
+
+    @Override
+    public BinaryString getChar(int pos, int length) {
+        String value = (String) icebergRecord.get(pos);
+        return BinaryString.fromBytes(value.getBytes());
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        String value = (String) icebergRecord.get(pos);
+        return BinaryString.fromBytes(value.getBytes());
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        BigDecimal bigDecimal = (BigDecimal) icebergRecord.get(pos);
+        return Decimal.fromBigDecimal(bigDecimal, precision, scale);
+    }
+
+    @Override
+    public TimestampNtz getTimestampNtz(int pos, int precision) {
+        Object value = icebergRecord.get(pos);
+        if (value == null) {
+            throw new IllegalStateException("Value at position " + pos + " is 
null");
+        }
+        LocalDateTime localDateTime = (LocalDateTime) value;
+        return TimestampNtz.fromLocalDateTime(localDateTime);
+    }
+
+    @Override
+    public TimestampLtz getTimestampLtz(int pos, int precision) {
+        Object value = icebergRecord.get(pos);
+        OffsetDateTime offsetDateTime = (OffsetDateTime) value;
+        return TimestampLtz.fromInstant(offsetDateTime.toInstant());
+    }
+
+    @Override
+    public byte[] getBinary(int pos, int length) {
+        Object value = icebergRecord.get(pos);
+        ByteBuffer byteBuffer = (ByteBuffer) value;
+        return BytesUtils.toArray(byteBuffer);
+    }
+
+    @Override
+    public byte[] getBytes(int pos) {
+        Object value = icebergRecord.get(pos);
+        ByteBuffer byteBuffer = (ByteBuffer) value;
+        return BytesUtils.toArray(byteBuffer);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
new file mode 100644
index 000000000..85cbe9f68
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link IcebergRecordAsFlussRow}. */
+class IcebergRecordAsFlussRowTest {
+
+    private IcebergRecordAsFlussRow icebergRecordAsFlussRow;
+    private Record record;
+
+    @BeforeEach
+    void setUp() {
+        icebergRecordAsFlussRow = new IcebergRecordAsFlussRow();
+
+        // Create a schema with various data types
+        Schema schema =
+                new Schema(
+                        required(1, "id", Types.LongType.get()),
+                        optional(2, "name", Types.StringType.get()),
+                        optional(3, "age", Types.IntegerType.get()),
+                        optional(4, "salary", Types.DoubleType.get()),
+                        optional(5, "is_active", Types.BooleanType.get()),
+                        optional(6, "tiny_int", Types.IntegerType.get()),
+                        optional(7, "small_int", Types.IntegerType.get()),
+                        optional(8, "float_val", Types.FloatType.get()),
+                        optional(9, "decimal_val", Types.DecimalType.of(10, 
2)),
+                        optional(10, "timestamp_ntz", 
Types.TimestampType.withoutZone()),
+                        optional(11, "timestamp_ltz", 
Types.TimestampType.withZone()),
+                        optional(12, "binary_data", Types.BinaryType.get()),
+                        optional(13, "char_data", Types.StringType.get()),
+                        // System columns
+                        required(14, "__bucket", Types.IntegerType.get()),
+                        required(15, "__offset", Types.LongType.get()),
+                        required(16, "__timestamp", 
Types.TimestampType.withZone()));
+
+        record = GenericRecord.create(schema);
+    }
+
+    @Test
+    void testGetFieldCount() {
+        // Set up record with data
+        record.setField("id", 1L);
+        record.setField("name", "John");
+        record.setField("age", 30);
+        record.setField("__bucket", 1);
+        record.setField("__offset", 100L);
+        record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
+
+        icebergRecordAsFlussRow.setIcebergRecord(record);
+
+        // Should return count excluding system columns (3 system columns)
+        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13);
+    }
+
+    @Test
+    void testIsNullAt() {
+        record.setField("id", 1L);
+        record.setField("name", null); // null value
+        record.setField("age", 30);
+
+        icebergRecordAsFlussRow.setIcebergRecord(record);
+
+        assertThat(icebergRecordAsFlussRow.isNullAt(0)).isFalse(); // id
+        assertThat(icebergRecordAsFlussRow.isNullAt(1)).isTrue(); // name
+        assertThat(icebergRecordAsFlussRow.isNullAt(2)).isFalse(); // age
+    }
+
+    @Test
+    void testAllDataTypes() {
+        // Set up all data types with test values
+        record.setField("id", 12345L);
+        record.setField("name", "John Doe");
+        record.setField("age", 30);
+        record.setField("salary", 50000.50);
+        record.setField("is_active", true);
+        record.setField("tiny_int", 127);
+        record.setField("small_int", 32767);
+        record.setField("float_val", 3.14f);
+        record.setField("decimal_val", new BigDecimal("123.45"));
+        record.setField("timestamp_ntz", LocalDateTime.of(2023, 12, 25, 10, 
30, 45));
+        record.setField(
+                "timestamp_ltz", OffsetDateTime.of(2023, 12, 25, 10, 30, 45, 
0, ZoneOffset.UTC));
+        record.setField("binary_data", ByteBuffer.wrap("Hello 
World".getBytes()));
+        record.setField("char_data", "Hello");
+        // System columns
+        record.setField("__bucket", 1);
+        record.setField("__offset", 100L);
+        record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
+
+        icebergRecordAsFlussRow.setIcebergRecord(record);
+
+        // Test all data type conversions
+        assertThat(icebergRecordAsFlussRow.getLong(0)).isEqualTo(12345L); // id
+        
assertThat(icebergRecordAsFlussRow.getString(1).toString()).isEqualTo("John 
Doe"); // name
+        assertThat(icebergRecordAsFlussRow.getInt(2)).isEqualTo(30); // age
+        assertThat(icebergRecordAsFlussRow.getDouble(3)).isEqualTo(50000.50); 
// salary
+        assertThat(icebergRecordAsFlussRow.getBoolean(4)).isTrue(); // 
is_active
+        assertThat(icebergRecordAsFlussRow.getByte(5)).isEqualTo((byte) 127); 
// tiny_int
+        assertThat(icebergRecordAsFlussRow.getShort(6)).isEqualTo((short) 
32767); // small_int
+        assertThat(icebergRecordAsFlussRow.getFloat(7)).isEqualTo(3.14f); // 
float_val
+        assertThat(icebergRecordAsFlussRow.getDecimal(8, 10, 2).toBigDecimal())
+                .isEqualTo(new BigDecimal("123.45")); // decimal_val
+        assertThat(icebergRecordAsFlussRow.getTimestampNtz(9, 
3).toLocalDateTime())
+                .isEqualTo(LocalDateTime.of(2023, 12, 25, 10, 30, 45)); // 
timestamp_ntz
+        assertThat(icebergRecordAsFlussRow.getTimestampLtz(10, 3).toInstant())
+                .isEqualTo(
+                        OffsetDateTime.of(2023, 12, 25, 10, 30, 45, 0, 
ZoneOffset.UTC)
+                                .toInstant()); // timestamp_ltz
+        assertThat(icebergRecordAsFlussRow.getBytes(11))
+                .isEqualTo("Hello World".getBytes()); // binary_data
+        assertThat(icebergRecordAsFlussRow.getChar(12, 10).toString())
+                .isEqualTo("Hello"); // char_data
+
+        // Test field count (excluding system columns)
+        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index fc98372ab..9504ccc51 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -46,7 +46,7 @@ import static 
org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 /** A Paimon implementation of {@link LakeCatalog}. */
 public class PaimonLakeCatalog implements LakeCatalog {
 
-    private static final LinkedHashMap<String, DataType> SYSTEM_COLUMNS = new 
LinkedHashMap<>();
+    public static final LinkedHashMap<String, DataType> SYSTEM_COLUMNS = new 
LinkedHashMap<>();
 
     static {
         // We need __bucket system column to filter out the given bucket
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
index b237365cb..24304d0e4 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
@@ -25,14 +25,13 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
+import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
 import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
 import static org.apache.fluss.utils.Preconditions.checkState;
 
 /** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
 public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
 
-    // Lake table for paimon will append three system columns: __bucket, 
__offset,__timestamp
-    private static final int LAKE_PAIMON_SYSTEM_COLUMNS = 3;
     private final int bucket;
     private LogRecord logRecord;
     private int originRowFieldCount;
@@ -47,7 +46,7 @@ public class FlussRecordAsPaimonRow extends 
FlussRowAsPaimonRow {
         this.internalRow = logRecord.getRow();
         this.originRowFieldCount = internalRow.getFieldCount();
         checkState(
-                originRowFieldCount == tableRowType.getFieldCount() - 
LAKE_PAIMON_SYSTEM_COLUMNS,
+                originRowFieldCount == tableRowType.getFieldCount() - 
SYSTEM_COLUMNS.size(),
                 "The paimon table fields count must equals to LogRecord's 
fields count.");
     }
 
@@ -56,7 +55,7 @@ public class FlussRecordAsPaimonRow extends 
FlussRowAsPaimonRow {
         return
         //  business (including partitions) + system (three system fields: 
bucket, offset,
         // timestamp)
-        originRowFieldCount + LAKE_PAIMON_SYSTEM_COLUMNS;
+        originRowFieldCount + SYSTEM_COLUMNS.size();
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
index 5c66fb3eb..fbb2e5527 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
@@ -25,6 +25,8 @@ import org.apache.fluss.row.TimestampNtz;
 
 import org.apache.paimon.data.Timestamp;
 
+import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
+
 /** Adapter for paimon row as fluss row. */
 public class PaimonRowAsFlussRow implements InternalRow {
 
@@ -43,7 +45,7 @@ public class PaimonRowAsFlussRow implements InternalRow {
 
     @Override
     public int getFieldCount() {
-        return paimonRow.getFieldCount();
+        return paimonRow.getFieldCount() - SYSTEM_COLUMNS.size();
     }
 
     @Override

Reply via email to