This is an automated email from the ASF dual-hosted git repository.

leekei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 649bb41ae [lake/lance] Add NestedRow type support for Lance (#2578)
649bb41ae is described below

commit 649bb41ae7dacc88110025540f205f660418fc6d
Author: ForwardXu <[email protected]>
AuthorDate: Mon Mar 23 17:07:30 2026 +0800

    [lake/lance] Add NestedRow type support for Lance (#2578)
    
    * [lake/lance] Add NestedRow type support for Lance
    
    * [lake/lance] Add NestedRow type support for Lance
    
    * [lake/lance] Add NestedRow type support for Lance
    
    * [lake/lance] Add NestedRow type support for Lance
    
    * [lake/lance] Add NestedRow type support for Lance
    
    * [lake/lance] Add NestedRow type support for Lance
    
    * Remove unrelated mdx file change
    
    ---------
    
    Co-authored-by: Keith Lee <[email protected]>
---
 .../lake/lance/tiering/ShadedArrowBatchWriter.java |   7 +
 .../fluss/lake/lance/utils/ArrowDataConverter.java |  76 ++++++++
 .../fluss/lake/lance/utils/LanceArrowUtils.java    |  10 +
 .../lance/testutils/FlinkLanceTieringTestBase.java |  68 +++++++
 .../lake/lance/tiering/LanceTieringITCase.java     | 206 +++++++++++++++++++++
 .../fluss/lake/lance/tiering/LanceTieringTest.java | 162 ++++++++++++++++
 .../lake/lance/utils/LanceArrowUtilsTest.java      | 169 ++++++++++++++++-
 7 files changed, 697 insertions(+), 1 deletion(-)

diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
index 21b2aa1ec..5ab4ca547 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
@@ -25,6 +25,7 @@ import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVe
 import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
 import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
 import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector;
+import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector;
 import org.apache.fluss.types.RowType;
 import org.apache.fluss.utils.ArrowUtils;
 
@@ -107,6 +108,12 @@ public class ShadedArrowBatchWriter implements 
AutoCloseable {
             if (dataVector != null) {
                 initFieldVector(dataVector);
             }
+        } else if (fieldVector instanceof StructVector) {
+            StructVector structVector = (StructVector) fieldVector;
+            structVector.allocateNew();
+            for (FieldVector childVector : 
structVector.getChildrenFromFields()) {
+                initFieldVector(childVector);
+            }
         } else {
             fieldVector.allocateNew();
         }
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
index 0694b7fcb..2c1e7dcd6 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
@@ -23,6 +23,7 @@ import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.complex.FixedSizeListVector;
 import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import java.nio.ByteBuffer;
@@ -91,9 +92,46 @@ public class ArrowDataConverter {
                                 shadedVector,
                         (ListVector) nonShadedVector);
                 return;
+            } else {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Shaded vector is ListVector but non-shaded 
vector is %s, expected ListVector or FixedSizeListVector.",
+                                nonShadedVector.getClass().getSimpleName()));
             }
         }
 
+        if (shadedVector
+                instanceof
+                
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector) {
+            if (!(nonShadedVector instanceof StructVector)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Shaded vector is StructVector but non-shaded 
vector is %s, expected StructVector.",
+                                nonShadedVector.getClass().getSimpleName()));
+            }
+            copyStructVectorData(
+                    
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector)
+                            shadedVector,
+                    (StructVector) nonShadedVector);
+            return;
+        }
+
+        if (nonShadedVector instanceof StructVector) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Non-shaded vector is StructVector but shaded 
vector is %s, expected shaded StructVector.",
+                            shadedVector.getClass().getSimpleName()));
+        }
+
+        if (nonShadedVector instanceof ListVector
+                || nonShadedVector instanceof FixedSizeListVector) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Non-shaded vector is %s but shaded vector is %s, 
expected shaded ListVector.",
+                            nonShadedVector.getClass().getSimpleName(),
+                            shadedVector.getClass().getSimpleName()));
+        }
+
         List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf> 
shadedBuffers =
                 shadedVector.getFieldBuffers();
 
@@ -281,4 +319,42 @@ public class ArrowDataConverter {
             }
         }
     }
+
+    private static void copyStructVectorData(
+            
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector
+                    shadedStructVector,
+            StructVector nonShadedStructVector) {
+
+        // First, recursively copy all child vectors
+        
List<org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector> 
shadedChildren =
+                shadedStructVector.getChildrenFromFields();
+        List<FieldVector> nonShadedChildren = 
nonShadedStructVector.getChildrenFromFields();
+
+        for (int i = 0; i < Math.min(shadedChildren.size(), 
nonShadedChildren.size()); i++) {
+            copyVectorData(shadedChildren.get(i), nonShadedChildren.get(i));
+        }
+
+        // Copy the StructVector's own validity buffer
+        List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf> 
shadedBuffers =
+                shadedStructVector.getFieldBuffers();
+        List<ArrowBuf> nonShadedBuffers = 
nonShadedStructVector.getFieldBuffers();
+
+        for (int i = 0; i < Math.min(shadedBuffers.size(), 
nonShadedBuffers.size()); i++) {
+            org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf 
shadedBuf =
+                    shadedBuffers.get(i);
+            ArrowBuf nonShadedBuf = nonShadedBuffers.get(i);
+
+            long size = Math.min(shadedBuf.capacity(), 
nonShadedBuf.capacity());
+            if (size > 0) {
+                ByteBuffer srcBuffer = shadedBuf.nioBuffer(0, (int) size);
+                srcBuffer.position(0);
+                srcBuffer.limit((int) Math.min(size, Integer.MAX_VALUE));
+                nonShadedBuf.setBytes(0, srcBuffer);
+            }
+        }
+
+        // Set value count
+        int valueCount = shadedStructVector.getValueCount();
+        nonShadedStructVector.setValueCount(valueCount);
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
index ed85b1a8c..e0d8a0b3a 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
@@ -23,6 +23,7 @@ import org.apache.fluss.types.BinaryType;
 import org.apache.fluss.types.BooleanType;
 import org.apache.fluss.types.BytesType;
 import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataField;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DateType;
 import org.apache.fluss.types.DecimalType;
@@ -45,6 +46,7 @@ import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.types.pojo.Schema;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -125,6 +127,12 @@ public class LanceArrowUtils {
                                     "element",
                                     ((ArrayType) logicalType).getElementType(),
                                     tableProperties));
+        } else if (logicalType instanceof RowType) {
+            RowType rowType = (RowType) logicalType;
+            children = new ArrayList<>(rowType.getFieldCount());
+            for (DataField field : rowType.getFields()) {
+                children.add(toArrowField(field.getName(), field.getType(), 
tableProperties));
+            }
         }
         return new Field(fieldName, fieldType, children);
     }
@@ -192,6 +200,8 @@ public class LanceArrowUtils {
             }
         } else if (dataType instanceof ArrayType) {
             return ArrowType.List.INSTANCE;
+        } else if (dataType instanceof RowType) {
+            return ArrowType.Struct.INSTANCE;
         } else {
             throw new UnsupportedOperationException(
                     String.format(
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
index 64fc8f71f..08c3ef223 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
@@ -197,6 +197,74 @@ public class FlinkLanceTieringTestBase {
         return createTable(tablePath, tableBuilder.build());
     }
 
+    protected long createLogTableWithNestedRowType(TablePath tablePath) throws 
Exception {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column(
+                                "address",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("city", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("zip", 
DataTypes.INT())));
+
+        TableDescriptor.Builder tableBuilder =
+                TableDescriptor.builder()
+                        .distributedBy(1, "id")
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+
+        tableBuilder.schema(schemaBuilder.build());
+        return createTable(tablePath, tableBuilder.build());
+    }
+
+    protected long createLogTableWithNestedRowOfRowType(TablePath tablePath) 
throws Exception {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column(
+                                "contact",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("phone", 
DataTypes.STRING()),
+                                        DataTypes.FIELD(
+                                                "address",
+                                                DataTypes.ROW(
+                                                        
DataTypes.FIELD("city", DataTypes.STRING()),
+                                                        DataTypes.FIELD("zip", 
DataTypes.INT())))));
+
+        TableDescriptor.Builder tableBuilder =
+                TableDescriptor.builder()
+                        .distributedBy(1, "id")
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+
+        tableBuilder.schema(schemaBuilder.build());
+        return createTable(tablePath, tableBuilder.build());
+    }
+
+    protected long createLogTableWithArrayOfRowType(TablePath tablePath) 
throws Exception {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column(
+                                "items",
+                                DataTypes.ARRAY(
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("item_name", 
DataTypes.STRING()),
+                                                DataTypes.FIELD("quantity", 
DataTypes.INT()))));
+
+        TableDescriptor.Builder tableBuilder =
+                TableDescriptor.builder()
+                        .distributedBy(1, "id")
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+
+        tableBuilder.schema(schemaBuilder.build());
+        return createTable(tablePath, tableBuilder.build());
+    }
+
     protected void writeRows(TablePath tablePath, List<InternalRow> rows, 
boolean append)
             throws Exception {
         try (Table table = conn.getTable(tablePath)) {
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
index 74be68e2d..29aaef965 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
@@ -23,7 +23,9 @@ import org.apache.fluss.lake.lance.LanceConfig;
 import org.apache.fluss.lake.lance.testutils.FlinkLanceTieringTestBase;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.server.zk.data.lake.LakeTable;
 
 import com.lancedb.lance.Dataset;
@@ -140,6 +142,172 @@ class LanceTieringITCase extends 
FlinkLanceTieringTestBase {
         jobClient.cancel().get();
     }
 
+    @Test
+    void testTieringWithNestedRowType() throws Exception {
+        // Test: Log table with nested Row type
+        TablePath t1 = TablePath.of(DEFAULT_DB, "logTableWithNestedRow");
+        long t1Id = createLogTableWithNestedRowType(t1);
+        TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+        // Create nested row data
+        for (int i = 0; i < 10; i++) {
+            GenericRow addressRow1 = new GenericRow(2);
+            addressRow1.setField(0, BinaryString.fromString("New York"));
+            addressRow1.setField(1, 10001);
+
+            GenericRow addressRow2 = new GenericRow(2);
+            addressRow2.setField(0, BinaryString.fromString("Los Angeles"));
+            addressRow2.setField(1, 90001);
+
+            GenericRow addressRow3 = new GenericRow(2);
+            addressRow3.setField(0, BinaryString.fromString("Chicago"));
+            addressRow3.setField(1, 60601);
+
+            writeRows(
+                    t1,
+                    Arrays.asList(
+                            row(1, "Alice", addressRow1),
+                            row(2, "Bob", addressRow2),
+                            row(3, "Charlie", addressRow3)),
+                    true);
+        }
+
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        // check the status of replica after synced
+        assertReplicaStatus(t1Bucket, 30);
+
+        LanceConfig config1 =
+                LanceConfig.from(
+                        lanceConf.toMap(),
+                        Collections.emptyMap(),
+                        t1.getDatabaseName(),
+                        t1.getTableName());
+
+        // check data in lance using TSV string comparison
+        String expectedTsv1 = buildExpectedTsvForNestedRowTable(30);
+        checkDataInLance(config1, expectedTsv1);
+        checkSnapshotPropertyInLance(config1, 
Collections.singletonMap(t1Bucket, 30L));
+
+        jobClient.cancel().get();
+    }
+
+    @Test
+    void testTieringWithNestedRowOfRowType() throws Exception {
+        // Test: Log table with Row of Row (nested struct within struct)
+        TablePath t1 = TablePath.of(DEFAULT_DB, "logTableWithNestedRowOfRow");
+        long t1Id = createLogTableWithNestedRowOfRowType(t1);
+        TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+        // Create nested row of row data
+        for (int i = 0; i < 10; i++) {
+            GenericRow innerAddress1 = new GenericRow(2);
+            innerAddress1.setField(0, BinaryString.fromString("New York"));
+            innerAddress1.setField(1, 10001);
+            GenericRow contact1 = new GenericRow(2);
+            contact1.setField(0, BinaryString.fromString("111-1111"));
+            contact1.setField(1, innerAddress1);
+
+            GenericRow innerAddress2 = new GenericRow(2);
+            innerAddress2.setField(0, BinaryString.fromString("Los Angeles"));
+            innerAddress2.setField(1, 90001);
+            GenericRow contact2 = new GenericRow(2);
+            contact2.setField(0, BinaryString.fromString("222-2222"));
+            contact2.setField(1, innerAddress2);
+
+            GenericRow innerAddress3 = new GenericRow(2);
+            innerAddress3.setField(0, BinaryString.fromString("Chicago"));
+            innerAddress3.setField(1, 60601);
+            GenericRow contact3 = new GenericRow(2);
+            contact3.setField(0, BinaryString.fromString("333-3333"));
+            contact3.setField(1, innerAddress3);
+
+            writeRows(
+                    t1,
+                    Arrays.asList(
+                            row(1, "Alice", contact1),
+                            row(2, "Bob", contact2),
+                            row(3, "Charlie", contact3)),
+                    true);
+        }
+
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        // check the status of replica after synced
+        assertReplicaStatus(t1Bucket, 30);
+
+        LanceConfig config1 =
+                LanceConfig.from(
+                        lanceConf.toMap(),
+                        Collections.emptyMap(),
+                        t1.getDatabaseName(),
+                        t1.getTableName());
+
+        // check data in lance using TSV string comparison
+        String expectedTsv1 = buildExpectedTsvForNestedRowOfRowTable(30);
+        checkDataInLance(config1, expectedTsv1);
+        checkSnapshotPropertyInLance(config1, 
Collections.singletonMap(t1Bucket, 30L));
+
+        jobClient.cancel().get();
+    }
+
+    @Test
+    void testTieringWithArrayOfRowType() throws Exception {
+        // Test: Log table with array of Row type
+        TablePath t1 = TablePath.of(DEFAULT_DB, "logTableWithArrayOfRow");
+        long t1Id = createLogTableWithArrayOfRowType(t1);
+        TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+        // Create array of rows data
+        for (int i = 0; i < 10; i++) {
+            GenericRow item1 = new GenericRow(2);
+            item1.setField(0, BinaryString.fromString("Apple"));
+            item1.setField(1, 5);
+
+            GenericRow item2 = new GenericRow(2);
+            item2.setField(0, BinaryString.fromString("Banana"));
+            item2.setField(1, 3);
+
+            GenericRow item3 = new GenericRow(2);
+            item3.setField(0, BinaryString.fromString("Orange"));
+            item3.setField(1, 7);
+
+            GenericArray items1 = new GenericArray(new Object[] {item1, 
item2});
+            GenericArray items2 = new GenericArray(new Object[] {item3});
+            GenericArray items3 = new GenericArray(new Object[] {item1, item2, 
item3});
+
+            writeRows(
+                    t1,
+                    Arrays.asList(
+                            row(1, "Order1", items1),
+                            row(2, "Order2", items2),
+                            row(3, "Order3", items3)),
+                    true);
+        }
+
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        // check the status of replica after synced
+        assertReplicaStatus(t1Bucket, 30);
+
+        LanceConfig config1 =
+                LanceConfig.from(
+                        lanceConf.toMap(),
+                        Collections.emptyMap(),
+                        t1.getDatabaseName(),
+                        t1.getTableName());
+
+        // check data in lance using TSV string comparison
+        String expectedTsv1 = buildExpectedTsvForArrayOfRowTable(30);
+        checkDataInLance(config1, expectedTsv1);
+        checkSnapshotPropertyInLance(config1, 
Collections.singletonMap(t1Bucket, 30L));
+
+        jobClient.cancel().get();
+    }
+
     private void checkSnapshotPropertyInLance(
             LanceConfig config, Map<TableBucket, Long> expectedOffsets) throws 
Exception {
         ReadOptions.Builder builder = new ReadOptions.Builder();
@@ -208,4 +376,42 @@ class LanceTieringITCase extends FlinkLanceTieringTestBase 
{
         }
         return sb.toString();
     }
+
+    private String buildExpectedTsvForNestedRowTable(int rowCount) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("id\tname\taddress\n");
+        for (int i = 0; i < rowCount / 3; i++) {
+            sb.append("1\tAlice\t{\"city\":\"New York\",\"zip\":10001}\n");
+            sb.append("2\tBob\t{\"city\":\"Los Angeles\",\"zip\":90001}\n");
+            sb.append("3\tCharlie\t{\"city\":\"Chicago\",\"zip\":60601}\n");
+        }
+        return sb.toString();
+    }
+
+    private String buildExpectedTsvForNestedRowOfRowTable(int rowCount) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("id\tname\tcontact\n");
+        for (int i = 0; i < rowCount / 3; i++) {
+            sb.append(
+                    
"1\tAlice\t{\"phone\":\"111-1111\",\"address\":{\"city\":\"New 
York\",\"zip\":10001}}\n");
+            sb.append(
+                    
"2\tBob\t{\"phone\":\"222-2222\",\"address\":{\"city\":\"Los 
Angeles\",\"zip\":90001}}\n");
+            sb.append(
+                    
"3\tCharlie\t{\"phone\":\"333-3333\",\"address\":{\"city\":\"Chicago\",\"zip\":60601}}\n");
+        }
+        return sb.toString();
+    }
+
+    private String buildExpectedTsvForArrayOfRowTable(int rowCount) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("id\tname\titems\n");
+        for (int i = 0; i < rowCount / 3; i++) {
+            sb.append(
+                    
"1\tOrder1\t[{\"item_name\":\"Apple\",\"quantity\":5},{\"item_name\":\"Banana\",\"quantity\":3}]\n");
+            
sb.append("2\tOrder2\t[{\"item_name\":\"Orange\",\"quantity\":7}]\n");
+            sb.append(
+                    
"3\tOrder3\t[{\"item_name\":\"Apple\",\"quantity\":5},{\"item_name\":\"Banana\",\"quantity\":3},{\"item_name\":\"Orange\",\"quantity\":7}]\n");
+        }
+        return sb.toString();
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
index a7a13ff5e..9a9b9c35b 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
@@ -21,6 +21,7 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.lake.committer.CommitterInitContext;
+import org.apache.fluss.lake.committer.LakeCommitResult;
 import org.apache.fluss.lake.committer.LakeCommitter;
 import org.apache.fluss.lake.lance.LanceConfig;
 import org.apache.fluss.lake.lance.utils.LanceArrowUtils;
@@ -355,4 +356,165 @@ class LanceTieringTest {
 
         return schema;
     }
+
+    @ParameterizedTest
+    @MethodSource("tieringWriteArgs")
+    void testTieringWriteTableWithNestedRowType(boolean isPartitioned) throws 
Exception {
+        int bucketNum = 3;
+        TablePath tablePath = TablePath.of("lance", "nestedRowTable");
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("lance.batch_size", "256");
+        LanceConfig config =
+                LanceConfig.from(
+                        configuration.toMap(),
+                        customProperties,
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+        Schema schema = createNestedRowTable(config);
+
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .distributedBy(bucketNum)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .customProperties(customProperties)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
+
+        List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
+        SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
+                lanceLakeTieringFactory.getWriteResultSerializer();
+        SimpleVersionedSerializer<LanceCommittable> committableSerializer =
+                lanceLakeTieringFactory.getCommittableSerializer();
+
+        Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new 
HashMap<>();
+        Map<Long, String> partitionIdAndName =
+                isPartitioned
+                        ? new HashMap<Long, String>() {
+                            {
+                                put(1L, "p1");
+                                put(2L, "p2");
+                                put(3L, "p3");
+                            }
+                        }
+                        : Collections.singletonMap(null, null);
+
+        // First, write data with nested row types
+        for (int bucket = 0; bucket < bucketNum; bucket++) {
+            for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
+                String partition = entry.getValue();
+                try (LakeWriter<LanceWriteResult> lakeWriter =
+                        createLakeWriter(tablePath, bucket, partition, 
tableInfo)) {
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
+                            genNestedRowLogRecords(bucket, 10);
+                    List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+                    List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+                    recordsByBucket.put(partitionBucket, expectRecords);
+                    for (LogRecord logRecord : writtenRecords) {
+                        lakeWriter.write(logRecord);
+                    }
+                    // serialize/deserialize writeResult
+                    LanceWriteResult lanceWriteResult = lakeWriter.complete();
+                    byte[] serialized = 
writeResultSerializer.serialize(lanceWriteResult);
+                    lanceWriteResults.add(
+                            writeResultSerializer.deserialize(
+                                    writeResultSerializer.getVersion(), 
serialized));
+                }
+            }
+        }
+
+        // Second, commit data
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath, tableInfo)) {
+            // serialize/deserialize committable
+            LanceCommittable lanceCommittable = 
lakeCommitter.toCommittable(lanceWriteResults);
+            byte[] serialized = 
committableSerializer.serialize(lanceCommittable);
+            lanceCommittable =
+                    committableSerializer.deserialize(
+                            committableSerializer.getVersion(), serialized);
+            Map<String, String> snapshotProperties =
+                    
Collections.singletonMap(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, "offsets");
+            LakeCommitResult commitResult =
+                    lakeCommitter.commit(lanceCommittable, snapshotProperties);
+            // lance dataset version starts from 1
+            assertThat(commitResult.getCommittedSnapshotId()).isEqualTo(2);
+        }
+
+        try (Dataset dataset =
+                Dataset.open(
+                        new RootAllocator(),
+                        config.getDatasetUri(),
+                        LanceConfig.genReadOptionFromConfig(config))) {
+            ArrowReader reader = dataset.newScan().scanBatches();
+            VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+
+            // Verify data can be read back
+            for (int bucket = 0; bucket < 3; bucket++) {
+                for (String partition : partitionIdAndName.values()) {
+                    reader.loadNextBatch();
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    List<LogRecord> expectRecords = 
recordsByBucket.get(partitionBucket);
+                    verifyNestedRowRecords(readerRoot, expectRecords);
+                }
+            }
+            assertThat(reader.loadNextBatch()).isFalse();
+        }
+    }
+
+    private Schema createNestedRowTable(LanceConfig config) {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column(
+                                "address",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("city", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("zip", 
DataTypes.INT())));
+        Schema schema = schemaBuilder.build();
+        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+        LanceDatasetAdapter.createDataset(
+                config.getDatasetUri(), 
LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
+
+        return schema;
+    }
+
+    private Tuple2<List<LogRecord>, List<LogRecord>> genNestedRowLogRecords(
+            int bucket, int numRecords) {
+        List<LogRecord> logRecords = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            GenericRow genericRow = new GenericRow(3);
+            genericRow.setField(0, i);
+            genericRow.setField(1, BinaryString.fromString("user" + bucket + 
"_" + i));
+
+            // Create nested address row
+            GenericRow addressRow = new GenericRow(2);
+            addressRow.setField(0, BinaryString.fromString("city" + bucket));
+            addressRow.setField(1, 10000 + bucket);
+            genericRow.setField(2, addressRow);
+
+            LogRecord logRecord =
+                    new GenericRecord(
+                            i, System.currentTimeMillis(), 
ChangeType.APPEND_ONLY, genericRow);
+            logRecords.add(logRecord);
+        }
+        return Tuple2.of(logRecords, logRecords);
+    }
+
+    private void verifyNestedRowRecords(VectorSchemaRoot root, List<LogRecord> 
expectRecords) {
+        assertThat(root.getRowCount()).isEqualTo(expectRecords.size());
+        for (int i = 0; i < expectRecords.size(); i++) {
+            LogRecord expectRecord = expectRecords.get(i);
+            // check id column
+            assertThat((int) (root.getVector(0).getObject(i)))
+                    .isEqualTo(expectRecord.getRow().getInt(0));
+            // check name column
+            assertThat(((VarCharVector) 
root.getVector(1)).getObject(i).toString())
+                    .isEqualTo(expectRecord.getRow().getString(1).toString());
+            // For nested row, just verify that the struct vector is not null 
and has correct
+            // structure
+            assertThat(root.getVector(2)).isNotNull();
+        }
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java
index d682b0d88..ef224ca7a 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java
@@ -27,12 +27,13 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link LanceArrowUtils#toArrowSchema(RowType, Map)}. */
+/** Tests for {@link LanceArrowUtils}. */
 class LanceArrowUtilsTest {
 
     @Test
@@ -131,4 +132,170 @@ class LanceArrowUtilsTest {
         Field embeddingField = schema.findField("embedding");
         
assertThat(embeddingField.getType()).isInstanceOf(ArrowType.List.class);
     }
+
+    @Test
+    void testToArrowSchemaWithNestedRowType() {
+        // Create a RowType with a nested Row field
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("id", DataTypes.INT()),
+                        DataTypes.FIELD("name", DataTypes.STRING()),
+                        DataTypes.FIELD(
+                                "address",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("city", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("zip", 
DataTypes.INT()))));
+
+        Schema arrowSchema = LanceArrowUtils.toArrowSchema(rowType);
+
+        // Verify the schema has 3 fields
+        assertThat(arrowSchema.getFields()).hasSize(3);
+
+        // Verify the first two fields are simple types
+        Field idField = arrowSchema.getFields().get(0);
+        assertThat(idField.getName()).isEqualTo("id");
+        assertThat(idField.getType()).isInstanceOf(ArrowType.Int.class);
+
+        Field nameField = arrowSchema.getFields().get(1);
+        assertThat(nameField.getName()).isEqualTo("name");
+        assertThat(nameField.getType()).isEqualTo(ArrowType.Utf8.INSTANCE);
+
+        // Verify the nested row field
+        Field addressField = arrowSchema.getFields().get(2);
+        assertThat(addressField.getName()).isEqualTo("address");
+        
assertThat(addressField.getType()).isEqualTo(ArrowType.Struct.INSTANCE);
+
+        // Verify the nested fields
+        List<Field> addressChildren = addressField.getChildren();
+        assertThat(addressChildren).hasSize(2);
+
+        Field cityField = addressChildren.get(0);
+        assertThat(cityField.getName()).isEqualTo("city");
+        assertThat(cityField.getType()).isEqualTo(ArrowType.Utf8.INSTANCE);
+
+        Field zipField = addressChildren.get(1);
+        assertThat(zipField.getName()).isEqualTo("zip");
+        assertThat(zipField.getType()).isInstanceOf(ArrowType.Int.class);
+    }
+
+    @Test
+    void testToArrowSchemaWithDeeplyNestedRowType() {
+        // Create a RowType with deeply nested Row fields
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("id", DataTypes.INT()),
+                        DataTypes.FIELD(
+                                "level1",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("name", 
DataTypes.STRING()),
+                                        DataTypes.FIELD(
+                                                "level2",
+                                                DataTypes.ROW(
+                                                        DataTypes.FIELD(
+                                                                "value", 
DataTypes.DOUBLE()))))));
+
+        Schema arrowSchema = LanceArrowUtils.toArrowSchema(rowType);
+
+        // Verify the schema structure
+        assertThat(arrowSchema.getFields()).hasSize(2);
+
+        // Verify level1 struct
+        Field level1Field = arrowSchema.getFields().get(1);
+        assertThat(level1Field.getName()).isEqualTo("level1");
+        assertThat(level1Field.getType()).isEqualTo(ArrowType.Struct.INSTANCE);
+        assertThat(level1Field.getChildren()).hasSize(2);
+
+        // Verify level2 struct
+        Field level2Field = level1Field.getChildren().get(1);
+        assertThat(level2Field.getName()).isEqualTo("level2");
+        assertThat(level2Field.getType()).isEqualTo(ArrowType.Struct.INSTANCE);
+        assertThat(level2Field.getChildren()).hasSize(1);
+
+        // Verify value field in level2
+        Field valueField = level2Field.getChildren().get(0);
+        assertThat(valueField.getName()).isEqualTo("value");
+        
assertThat(valueField.getType()).isInstanceOf(ArrowType.FloatingPoint.class);
+    }
+
+    @Test
+    void testToArrowSchemaWithArrayOfRowType() {
+        // Create a RowType with an array of rows
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("id", DataTypes.INT()),
+                        DataTypes.FIELD(
+                                "items",
+                                DataTypes.ARRAY(
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("name", 
DataTypes.STRING()),
+                                                DataTypes.FIELD("price", 
DataTypes.DOUBLE())))));
+
+        Schema arrowSchema = LanceArrowUtils.toArrowSchema(rowType);
+
+        // Verify the schema structure
+        assertThat(arrowSchema.getFields()).hasSize(2);
+
+        // Verify items array field
+        Field itemsField = arrowSchema.getFields().get(1);
+        assertThat(itemsField.getName()).isEqualTo("items");
+        assertThat(itemsField.getType()).isEqualTo(ArrowType.List.INSTANCE);
+
+        // Verify the element type is a struct
+        List<Field> itemsChildren = itemsField.getChildren();
+        assertThat(itemsChildren).hasSize(1);
+
+        Field elementField = itemsChildren.get(0);
+        assertThat(elementField.getName()).isEqualTo("element");
+        
assertThat(elementField.getType()).isEqualTo(ArrowType.Struct.INSTANCE);
+
+        // Verify the struct fields
+        List<Field> structChildren = elementField.getChildren();
+        assertThat(structChildren).hasSize(2);
+
+        Field nameField = structChildren.get(0);
+        assertThat(nameField.getName()).isEqualTo("name");
+        assertThat(nameField.getType()).isEqualTo(ArrowType.Utf8.INSTANCE);
+
+        Field priceField = structChildren.get(1);
+        assertThat(priceField.getName()).isEqualTo("price");
+        
assertThat(priceField.getType()).isInstanceOf(ArrowType.FloatingPoint.class);
+    }
+
+    @Test
+    void testToArrowSchemaWithRowContainingArray() {
+        // Create a RowType with a nested row that contains an array
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("id", DataTypes.INT()),
+                        DataTypes.FIELD(
+                                "data",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD(
+                                                "tags", 
DataTypes.ARRAY(DataTypes.STRING())),
+                                        DataTypes.FIELD("count", 
DataTypes.INT()))));
+
+        Schema arrowSchema = LanceArrowUtils.toArrowSchema(rowType);
+
+        // Verify the schema structure
+        assertThat(arrowSchema.getFields()).hasSize(2);
+
+        // Verify data struct field
+        Field dataField = arrowSchema.getFields().get(1);
+        assertThat(dataField.getName()).isEqualTo("data");
+        assertThat(dataField.getType()).isEqualTo(ArrowType.Struct.INSTANCE);
+
+        // Verify tags array field within the struct
+        List<Field> dataChildren = dataField.getChildren();
+        assertThat(dataChildren).hasSize(2);
+
+        Field tagsField = dataChildren.get(0);
+        assertThat(tagsField.getName()).isEqualTo("tags");
+        assertThat(tagsField.getType()).isEqualTo(ArrowType.List.INSTANCE);
+
+        // Verify the array element type
+        List<Field> tagsChildren = tagsField.getChildren();
+        assertThat(tagsChildren).hasSize(1);
+        assertThat(tagsChildren.get(0).getName()).isEqualTo("element");
+        
assertThat(tagsChildren.get(0).getType()).isEqualTo(ArrowType.Utf8.INSTANCE);
+    }
 }


Reply via email to