This is an automated email from the ASF dual-hosted git repository.
leekei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 649bb41ae [lake/lance] Add NestedRow type support for Lance (#2578)
649bb41ae is described below
commit 649bb41ae7dacc88110025540f205f660418fc6d
Author: ForwardXu <[email protected]>
AuthorDate: Mon Mar 23 17:07:30 2026 +0800
[lake/lance] Add NestedRow type support for Lance (#2578)
* [lake/lance] Add NestedRow type support for Lance
* [lake/lance] Add NestedRow type support for Lance
* [lake/lance] Add NestedRow type support for Lance
* [lake/lance] Add NestedRow type support for Lance
* [lake/lance] Add NestedRow type support for Lance
* [lake/lance] Add NestedRow type support for Lance
* Remove unrelated mdx file change
---------
Co-authored-by: Keith Lee <[email protected]>
---
.../lake/lance/tiering/ShadedArrowBatchWriter.java | 7 +
.../fluss/lake/lance/utils/ArrowDataConverter.java | 76 ++++++++
.../fluss/lake/lance/utils/LanceArrowUtils.java | 10 +
.../lance/testutils/FlinkLanceTieringTestBase.java | 68 +++++++
.../lake/lance/tiering/LanceTieringITCase.java | 206 +++++++++++++++++++++
.../fluss/lake/lance/tiering/LanceTieringTest.java | 162 ++++++++++++++++
.../lake/lance/utils/LanceArrowUtilsTest.java | 169 ++++++++++++++++-
7 files changed, 697 insertions(+), 1 deletion(-)
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
index 21b2aa1ec..5ab4ca547 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
@@ -25,6 +25,7 @@ import
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVe
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
import
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector;
+import
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.ArrowUtils;
@@ -107,6 +108,12 @@ public class ShadedArrowBatchWriter implements
AutoCloseable {
if (dataVector != null) {
initFieldVector(dataVector);
}
+ } else if (fieldVector instanceof StructVector) {
+ StructVector structVector = (StructVector) fieldVector;
+ structVector.allocateNew();
+ for (FieldVector childVector :
structVector.getChildrenFromFields()) {
+ initFieldVector(childVector);
+ }
} else {
fieldVector.allocateNew();
}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
index 0694b7fcb..2c1e7dcd6 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
@@ -23,6 +23,7 @@ import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.types.pojo.Schema;
import java.nio.ByteBuffer;
@@ -91,9 +92,46 @@ public class ArrowDataConverter {
shadedVector,
(ListVector) nonShadedVector);
return;
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Shaded vector is ListVector but non-shaded
vector is %s, expected ListVector or FixedSizeListVector.",
+ nonShadedVector.getClass().getSimpleName()));
}
}
+ if (shadedVector
+ instanceof
+
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector) {
+ if (!(nonShadedVector instanceof StructVector)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Shaded vector is StructVector but non-shaded
vector is %s, expected StructVector.",
+ nonShadedVector.getClass().getSimpleName()));
+ }
+ copyStructVectorData(
+
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector)
+ shadedVector,
+ (StructVector) nonShadedVector);
+ return;
+ }
+
+ if (nonShadedVector instanceof StructVector) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Non-shaded vector is StructVector but shaded
vector is %s, expected shaded StructVector.",
+ shadedVector.getClass().getSimpleName()));
+ }
+
+ if (nonShadedVector instanceof ListVector
+ || nonShadedVector instanceof FixedSizeListVector) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Non-shaded vector is %s but shaded vector is %s,
expected shaded ListVector.",
+ nonShadedVector.getClass().getSimpleName(),
+ shadedVector.getClass().getSimpleName()));
+ }
+
List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf>
shadedBuffers =
shadedVector.getFieldBuffers();
@@ -281,4 +319,42 @@ public class ArrowDataConverter {
}
}
}
+
+ private static void copyStructVectorData(
+
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector
+ shadedStructVector,
+ StructVector nonShadedStructVector) {
+
+ // First, recursively copy all child vectors
+
List<org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector>
shadedChildren =
+ shadedStructVector.getChildrenFromFields();
+ List<FieldVector> nonShadedChildren =
nonShadedStructVector.getChildrenFromFields();
+
+ for (int i = 0; i < Math.min(shadedChildren.size(),
nonShadedChildren.size()); i++) {
+ copyVectorData(shadedChildren.get(i), nonShadedChildren.get(i));
+ }
+
+ // Copy the StructVector's own validity buffer
+ List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf>
shadedBuffers =
+ shadedStructVector.getFieldBuffers();
+ List<ArrowBuf> nonShadedBuffers =
nonShadedStructVector.getFieldBuffers();
+
+ for (int i = 0; i < Math.min(shadedBuffers.size(),
nonShadedBuffers.size()); i++) {
+ org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf
shadedBuf =
+ shadedBuffers.get(i);
+ ArrowBuf nonShadedBuf = nonShadedBuffers.get(i);
+
+ long size = Math.min(shadedBuf.capacity(),
nonShadedBuf.capacity());
+ if (size > 0) {
+ ByteBuffer srcBuffer = shadedBuf.nioBuffer(0, (int) size);
+ srcBuffer.position(0);
+ srcBuffer.limit((int) Math.min(size, Integer.MAX_VALUE));
+ nonShadedBuf.setBytes(0, srcBuffer);
+ }
+ }
+
+ // Set value count
+ int valueCount = shadedStructVector.getValueCount();
+ nonShadedStructVector.setValueCount(valueCount);
+ }
}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
index ed85b1a8c..e0d8a0b3a 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
@@ -23,6 +23,7 @@ import org.apache.fluss.types.BinaryType;
import org.apache.fluss.types.BooleanType;
import org.apache.fluss.types.BytesType;
import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataField;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DateType;
import org.apache.fluss.types.DecimalType;
@@ -45,6 +46,7 @@ import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -125,6 +127,12 @@ public class LanceArrowUtils {
"element",
((ArrayType) logicalType).getElementType(),
tableProperties));
+ } else if (logicalType instanceof RowType) {
+ RowType rowType = (RowType) logicalType;
+ children = new ArrayList<>(rowType.getFieldCount());
+ for (DataField field : rowType.getFields()) {
+ children.add(toArrowField(field.getName(), field.getType(),
tableProperties));
+ }
}
return new Field(fieldName, fieldType, children);
}
@@ -192,6 +200,8 @@ public class LanceArrowUtils {
}
} else if (dataType instanceof ArrayType) {
return ArrowType.List.INSTANCE;
+ } else if (dataType instanceof RowType) {
+ return ArrowType.Struct.INSTANCE;
} else {
throw new UnsupportedOperationException(
String.format(
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
index 64fc8f71f..08c3ef223 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
@@ -197,6 +197,74 @@ public class FlinkLanceTieringTestBase {
return createTable(tablePath, tableBuilder.build());
}
+ protected long createLogTableWithNestedRowType(TablePath tablePath) throws
Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column(
+ "address",
+ DataTypes.ROW(
+ DataTypes.FIELD("city",
DataTypes.STRING()),
+ DataTypes.FIELD("zip",
DataTypes.INT())));
+
+ TableDescriptor.Builder tableBuilder =
+ TableDescriptor.builder()
+ .distributedBy(1, "id")
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+
+ tableBuilder.schema(schemaBuilder.build());
+ return createTable(tablePath, tableBuilder.build());
+ }
+
+ protected long createLogTableWithNestedRowOfRowType(TablePath tablePath)
throws Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column(
+ "contact",
+ DataTypes.ROW(
+ DataTypes.FIELD("phone",
DataTypes.STRING()),
+ DataTypes.FIELD(
+ "address",
+ DataTypes.ROW(
+
DataTypes.FIELD("city", DataTypes.STRING()),
+ DataTypes.FIELD("zip",
DataTypes.INT())))));
+
+ TableDescriptor.Builder tableBuilder =
+ TableDescriptor.builder()
+ .distributedBy(1, "id")
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+
+ tableBuilder.schema(schemaBuilder.build());
+ return createTable(tablePath, tableBuilder.build());
+ }
+
+ protected long createLogTableWithArrayOfRowType(TablePath tablePath)
throws Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column(
+ "items",
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ DataTypes.FIELD("item_name",
DataTypes.STRING()),
+ DataTypes.FIELD("quantity",
DataTypes.INT()))));
+
+ TableDescriptor.Builder tableBuilder =
+ TableDescriptor.builder()
+ .distributedBy(1, "id")
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+
+ tableBuilder.schema(schemaBuilder.build());
+ return createTable(tablePath, tableBuilder.build());
+ }
+
protected void writeRows(TablePath tablePath, List<InternalRow> rows,
boolean append)
throws Exception {
try (Table table = conn.getTable(tablePath)) {
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
index 74be68e2d..29aaef965 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
@@ -23,7 +23,9 @@ import org.apache.fluss.lake.lance.LanceConfig;
import org.apache.fluss.lake.lance.testutils.FlinkLanceTieringTestBase;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericRow;
import org.apache.fluss.server.zk.data.lake.LakeTable;
import com.lancedb.lance.Dataset;
@@ -140,6 +142,172 @@ class LanceTieringITCase extends
FlinkLanceTieringTestBase {
jobClient.cancel().get();
}
+ @Test
+ void testTieringWithNestedRowType() throws Exception {
+ // Test: Log table with nested Row type
+ TablePath t1 = TablePath.of(DEFAULT_DB, "logTableWithNestedRow");
+ long t1Id = createLogTableWithNestedRowType(t1);
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+ // Create nested row data
+ for (int i = 0; i < 10; i++) {
+ GenericRow addressRow1 = new GenericRow(2);
+ addressRow1.setField(0, BinaryString.fromString("New York"));
+ addressRow1.setField(1, 10001);
+
+ GenericRow addressRow2 = new GenericRow(2);
+ addressRow2.setField(0, BinaryString.fromString("Los Angeles"));
+ addressRow2.setField(1, 90001);
+
+ GenericRow addressRow3 = new GenericRow(2);
+ addressRow3.setField(0, BinaryString.fromString("Chicago"));
+ addressRow3.setField(1, 60601);
+
+ writeRows(
+ t1,
+ Arrays.asList(
+ row(1, "Alice", addressRow1),
+ row(2, "Bob", addressRow2),
+ row(3, "Charlie", addressRow3)),
+ true);
+ }
+
+ // then start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ // check the status of replica after synced
+ assertReplicaStatus(t1Bucket, 30);
+
+ LanceConfig config1 =
+ LanceConfig.from(
+ lanceConf.toMap(),
+ Collections.emptyMap(),
+ t1.getDatabaseName(),
+ t1.getTableName());
+
+ // check data in lance using TSV string comparison
+ String expectedTsv1 = buildExpectedTsvForNestedRowTable(30);
+ checkDataInLance(config1, expectedTsv1);
+ checkSnapshotPropertyInLance(config1,
Collections.singletonMap(t1Bucket, 30L));
+
+ jobClient.cancel().get();
+ }
+
+ @Test
+ void testTieringWithNestedRowOfRowType() throws Exception {
+ // Test: Log table with Row of Row (nested struct within struct)
+ TablePath t1 = TablePath.of(DEFAULT_DB, "logTableWithNestedRowOfRow");
+ long t1Id = createLogTableWithNestedRowOfRowType(t1);
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+ // Create nested row of row data
+ for (int i = 0; i < 10; i++) {
+ GenericRow innerAddress1 = new GenericRow(2);
+ innerAddress1.setField(0, BinaryString.fromString("New York"));
+ innerAddress1.setField(1, 10001);
+ GenericRow contact1 = new GenericRow(2);
+ contact1.setField(0, BinaryString.fromString("111-1111"));
+ contact1.setField(1, innerAddress1);
+
+ GenericRow innerAddress2 = new GenericRow(2);
+ innerAddress2.setField(0, BinaryString.fromString("Los Angeles"));
+ innerAddress2.setField(1, 90001);
+ GenericRow contact2 = new GenericRow(2);
+ contact2.setField(0, BinaryString.fromString("222-2222"));
+ contact2.setField(1, innerAddress2);
+
+ GenericRow innerAddress3 = new GenericRow(2);
+ innerAddress3.setField(0, BinaryString.fromString("Chicago"));
+ innerAddress3.setField(1, 60601);
+ GenericRow contact3 = new GenericRow(2);
+ contact3.setField(0, BinaryString.fromString("333-3333"));
+ contact3.setField(1, innerAddress3);
+
+ writeRows(
+ t1,
+ Arrays.asList(
+ row(1, "Alice", contact1),
+ row(2, "Bob", contact2),
+ row(3, "Charlie", contact3)),
+ true);
+ }
+
+ // then start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ // check the status of replica after synced
+ assertReplicaStatus(t1Bucket, 30);
+
+ LanceConfig config1 =
+ LanceConfig.from(
+ lanceConf.toMap(),
+ Collections.emptyMap(),
+ t1.getDatabaseName(),
+ t1.getTableName());
+
+ // check data in lance using TSV string comparison
+ String expectedTsv1 = buildExpectedTsvForNestedRowOfRowTable(30);
+ checkDataInLance(config1, expectedTsv1);
+ checkSnapshotPropertyInLance(config1,
Collections.singletonMap(t1Bucket, 30L));
+
+ jobClient.cancel().get();
+ }
+
+ @Test
+ void testTieringWithArrayOfRowType() throws Exception {
+ // Test: Log table with array of Row type
+ TablePath t1 = TablePath.of(DEFAULT_DB, "logTableWithArrayOfRow");
+ long t1Id = createLogTableWithArrayOfRowType(t1);
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+ // Create array of rows data
+ for (int i = 0; i < 10; i++) {
+ GenericRow item1 = new GenericRow(2);
+ item1.setField(0, BinaryString.fromString("Apple"));
+ item1.setField(1, 5);
+
+ GenericRow item2 = new GenericRow(2);
+ item2.setField(0, BinaryString.fromString("Banana"));
+ item2.setField(1, 3);
+
+ GenericRow item3 = new GenericRow(2);
+ item3.setField(0, BinaryString.fromString("Orange"));
+ item3.setField(1, 7);
+
+ GenericArray items1 = new GenericArray(new Object[] {item1,
item2});
+ GenericArray items2 = new GenericArray(new Object[] {item3});
+ GenericArray items3 = new GenericArray(new Object[] {item1, item2,
item3});
+
+ writeRows(
+ t1,
+ Arrays.asList(
+ row(1, "Order1", items1),
+ row(2, "Order2", items2),
+ row(3, "Order3", items3)),
+ true);
+ }
+
+ // then start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ // check the status of replica after synced
+ assertReplicaStatus(t1Bucket, 30);
+
+ LanceConfig config1 =
+ LanceConfig.from(
+ lanceConf.toMap(),
+ Collections.emptyMap(),
+ t1.getDatabaseName(),
+ t1.getTableName());
+
+ // check data in lance using TSV string comparison
+ String expectedTsv1 = buildExpectedTsvForArrayOfRowTable(30);
+ checkDataInLance(config1, expectedTsv1);
+ checkSnapshotPropertyInLance(config1,
Collections.singletonMap(t1Bucket, 30L));
+
+ jobClient.cancel().get();
+ }
+
private void checkSnapshotPropertyInLance(
LanceConfig config, Map<TableBucket, Long> expectedOffsets) throws
Exception {
ReadOptions.Builder builder = new ReadOptions.Builder();
@@ -208,4 +376,42 @@ class LanceTieringITCase extends FlinkLanceTieringTestBase
{
}
return sb.toString();
}
+
+ private String buildExpectedTsvForNestedRowTable(int rowCount) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("id\tname\taddress\n");
+ for (int i = 0; i < rowCount / 3; i++) {
+ sb.append("1\tAlice\t{\"city\":\"New York\",\"zip\":10001}\n");
+ sb.append("2\tBob\t{\"city\":\"Los Angeles\",\"zip\":90001}\n");
+ sb.append("3\tCharlie\t{\"city\":\"Chicago\",\"zip\":60601}\n");
+ }
+ return sb.toString();
+ }
+
+ private String buildExpectedTsvForNestedRowOfRowTable(int rowCount) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("id\tname\tcontact\n");
+ for (int i = 0; i < rowCount / 3; i++) {
+ sb.append(
+
"1\tAlice\t{\"phone\":\"111-1111\",\"address\":{\"city\":\"New
York\",\"zip\":10001}}\n");
+ sb.append(
+
"2\tBob\t{\"phone\":\"222-2222\",\"address\":{\"city\":\"Los
Angeles\",\"zip\":90001}}\n");
+ sb.append(
+
"3\tCharlie\t{\"phone\":\"333-3333\",\"address\":{\"city\":\"Chicago\",\"zip\":60601}}\n");
+ }
+ return sb.toString();
+ }
+
+ private String buildExpectedTsvForArrayOfRowTable(int rowCount) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("id\tname\titems\n");
+ for (int i = 0; i < rowCount / 3; i++) {
+ sb.append(
+
"1\tOrder1\t[{\"item_name\":\"Apple\",\"quantity\":5},{\"item_name\":\"Banana\",\"quantity\":3}]\n");
+
sb.append("2\tOrder2\t[{\"item_name\":\"Orange\",\"quantity\":7}]\n");
+ sb.append(
+
"3\tOrder3\t[{\"item_name\":\"Apple\",\"quantity\":5},{\"item_name\":\"Banana\",\"quantity\":3},{\"item_name\":\"Orange\",\"quantity\":7}]\n");
+ }
+ return sb.toString();
+ }
}
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
index a7a13ff5e..9a9b9c35b 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
@@ -21,6 +21,7 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.lake.committer.CommitterInitContext;
+import org.apache.fluss.lake.committer.LakeCommitResult;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.lance.LanceConfig;
import org.apache.fluss.lake.lance.utils.LanceArrowUtils;
@@ -355,4 +356,165 @@ class LanceTieringTest {
return schema;
}
+
+ @ParameterizedTest
+ @MethodSource("tieringWriteArgs")
+ void testTieringWriteTableWithNestedRowType(boolean isPartitioned) throws
Exception {
+ int bucketNum = 3;
+ TablePath tablePath = TablePath.of("lance", "nestedRowTable");
+ Map<String, String> customProperties = new HashMap<>();
+ customProperties.put("lance.batch_size", "256");
+ LanceConfig config =
+ LanceConfig.from(
+ configuration.toMap(),
+ customProperties,
+ tablePath.getDatabaseName(),
+ tablePath.getTableName());
+ Schema schema = createNestedRowTable(config);
+
+ TableDescriptor descriptor =
+ TableDescriptor.builder()
+ .schema(schema)
+ .distributedBy(bucketNum)
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .customProperties(customProperties)
+ .build();
+ TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L,
1L);
+
+ List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
+ SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
+ lanceLakeTieringFactory.getWriteResultSerializer();
+ SimpleVersionedSerializer<LanceCommittable> committableSerializer =
+ lanceLakeTieringFactory.getCommittableSerializer();
+
+ Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new
HashMap<>();
+ Map<Long, String> partitionIdAndName =
+ isPartitioned
+ ? new HashMap<Long, String>() {
+ {
+ put(1L, "p1");
+ put(2L, "p2");
+ put(3L, "p3");
+ }
+ }
+ : Collections.singletonMap(null, null);
+
+ // First, write data with nested row types
+ for (int bucket = 0; bucket < bucketNum; bucket++) {
+ for (Map.Entry<Long, String> entry :
partitionIdAndName.entrySet()) {
+ String partition = entry.getValue();
+ try (LakeWriter<LanceWriteResult> lakeWriter =
+ createLakeWriter(tablePath, bucket, partition,
tableInfo)) {
+ Tuple2<String, Integer> partitionBucket =
Tuple2.of(partition, bucket);
+ Tuple2<List<LogRecord>, List<LogRecord>>
writeAndExpectRecords =
+ genNestedRowLogRecords(bucket, 10);
+ List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+ List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+ recordsByBucket.put(partitionBucket, expectRecords);
+ for (LogRecord logRecord : writtenRecords) {
+ lakeWriter.write(logRecord);
+ }
+ // serialize/deserialize writeResult
+ LanceWriteResult lanceWriteResult = lakeWriter.complete();
+ byte[] serialized =
writeResultSerializer.serialize(lanceWriteResult);
+ lanceWriteResults.add(
+ writeResultSerializer.deserialize(
+ writeResultSerializer.getVersion(),
serialized));
+ }
+ }
+ }
+
+ // Second, commit data
+ try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+ createLakeCommitter(tablePath, tableInfo)) {
+ // serialize/deserialize committable
+ LanceCommittable lanceCommittable =
lakeCommitter.toCommittable(lanceWriteResults);
+ byte[] serialized =
committableSerializer.serialize(lanceCommittable);
+ lanceCommittable =
+ committableSerializer.deserialize(
+ committableSerializer.getVersion(), serialized);
+ Map<String, String> snapshotProperties =
+
Collections.singletonMap(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, "offsets");
+ LakeCommitResult commitResult =
+ lakeCommitter.commit(lanceCommittable, snapshotProperties);
+ // lance dataset version starts from 1
+ assertThat(commitResult.getCommittedSnapshotId()).isEqualTo(2);
+ }
+
+ try (Dataset dataset =
+ Dataset.open(
+ new RootAllocator(),
+ config.getDatasetUri(),
+ LanceConfig.genReadOptionFromConfig(config))) {
+ ArrowReader reader = dataset.newScan().scanBatches();
+ VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+
+ // Verify data can be read back
+ for (int bucket = 0; bucket < 3; bucket++) {
+ for (String partition : partitionIdAndName.values()) {
+ reader.loadNextBatch();
+ Tuple2<String, Integer> partitionBucket =
Tuple2.of(partition, bucket);
+ List<LogRecord> expectRecords =
recordsByBucket.get(partitionBucket);
+ verifyNestedRowRecords(readerRoot, expectRecords);
+ }
+ }
+ assertThat(reader.loadNextBatch()).isFalse();
+ }
+ }
+
+ private Schema createNestedRowTable(LanceConfig config) {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column(
+ "address",
+ DataTypes.ROW(
+ DataTypes.FIELD("city",
DataTypes.STRING()),
+ DataTypes.FIELD("zip",
DataTypes.INT())));
+ Schema schema = schemaBuilder.build();
+ WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+ LanceDatasetAdapter.createDataset(
+ config.getDatasetUri(),
LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
+
+ return schema;
+ }
+
+ private Tuple2<List<LogRecord>, List<LogRecord>> genNestedRowLogRecords(
+ int bucket, int numRecords) {
+ List<LogRecord> logRecords = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ GenericRow genericRow = new GenericRow(3);
+ genericRow.setField(0, i);
+ genericRow.setField(1, BinaryString.fromString("user" + bucket +
"_" + i));
+
+ // Create nested address row
+ GenericRow addressRow = new GenericRow(2);
+ addressRow.setField(0, BinaryString.fromString("city" + bucket));
+ addressRow.setField(1, 10000 + bucket);
+ genericRow.setField(2, addressRow);
+
+ LogRecord logRecord =
+ new GenericRecord(
+ i, System.currentTimeMillis(),
ChangeType.APPEND_ONLY, genericRow);
+ logRecords.add(logRecord);
+ }
+ return Tuple2.of(logRecords, logRecords);
+ }
+
+ private void verifyNestedRowRecords(VectorSchemaRoot root, List<LogRecord>
expectRecords) {
+ assertThat(root.getRowCount()).isEqualTo(expectRecords.size());
+ for (int i = 0; i < expectRecords.size(); i++) {
+ LogRecord expectRecord = expectRecords.get(i);
+ // check id column
+ assertThat((int) (root.getVector(0).getObject(i)))
+ .isEqualTo(expectRecord.getRow().getInt(0));
+ // check name column
+ assertThat(((VarCharVector)
root.getVector(1)).getObject(i).toString())
+ .isEqualTo(expectRecord.getRow().getString(1).toString());
+ // For nested row, just verify that the struct vector is not null
and has correct
+ // structure
+ assertThat(root.getVector(2)).isNotNull();
+ }
+ }
}
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java
index d682b0d88..ef224ca7a 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java
@@ -27,12 +27,13 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Tests for {@link LanceArrowUtils#toArrowSchema(RowType, Map)}. */
+/** Tests for {@link LanceArrowUtils}. */
class LanceArrowUtilsTest {
@Test
@@ -131,4 +132,170 @@ class LanceArrowUtilsTest {
Field embeddingField = schema.findField("embedding");
assertThat(embeddingField.getType()).isInstanceOf(ArrowType.List.class);
}
+
+ @Test
+ void testToArrowSchemaWithNestedRowType() {
+ // Create a RowType with a nested Row field
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT()),
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD(
+ "address",
+ DataTypes.ROW(
+ DataTypes.FIELD("city",
DataTypes.STRING()),
+ DataTypes.FIELD("zip",
DataTypes.INT()))));
+
+ Schema arrowSchema = LanceArrowUtils.toArrowSchema(rowType);
+
+ // Verify the schema has 3 fields
+ assertThat(arrowSchema.getFields()).hasSize(3);
+
+ // Verify the first two fields are simple types
+ Field idField = arrowSchema.getFields().get(0);
+ assertThat(idField.getName()).isEqualTo("id");
+ assertThat(idField.getType()).isInstanceOf(ArrowType.Int.class);
+
+ Field nameField = arrowSchema.getFields().get(1);
+ assertThat(nameField.getName()).isEqualTo("name");
+ assertThat(nameField.getType()).isEqualTo(ArrowType.Utf8.INSTANCE);
+
+ // Verify the nested row field
+ Field addressField = arrowSchema.getFields().get(2);
+ assertThat(addressField.getName()).isEqualTo("address");
+
assertThat(addressField.getType()).isEqualTo(ArrowType.Struct.INSTANCE);
+
+ // Verify the nested fields
+ List<Field> addressChildren = addressField.getChildren();
+ assertThat(addressChildren).hasSize(2);
+
+ Field cityField = addressChildren.get(0);
+ assertThat(cityField.getName()).isEqualTo("city");
+ assertThat(cityField.getType()).isEqualTo(ArrowType.Utf8.INSTANCE);
+
+ Field zipField = addressChildren.get(1);
+ assertThat(zipField.getName()).isEqualTo("zip");
+ assertThat(zipField.getType()).isInstanceOf(ArrowType.Int.class);
+ }
+
+ @Test
+ void testToArrowSchemaWithDeeplyNestedRowType() {
+ // Create a RowType with deeply nested Row fields
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT()),
+ DataTypes.FIELD(
+ "level1",
+ DataTypes.ROW(
+ DataTypes.FIELD("name",
DataTypes.STRING()),
+ DataTypes.FIELD(
+ "level2",
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "value",
DataTypes.DOUBLE()))))));
+
+ Schema arrowSchema = LanceArrowUtils.toArrowSchema(rowType);
+
+ // Verify the schema structure
+ assertThat(arrowSchema.getFields()).hasSize(2);
+
+ // Verify level1 struct
+ Field level1Field = arrowSchema.getFields().get(1);
+ assertThat(level1Field.getName()).isEqualTo("level1");
+ assertThat(level1Field.getType()).isEqualTo(ArrowType.Struct.INSTANCE);
+ assertThat(level1Field.getChildren()).hasSize(2);
+
+ // Verify level2 struct
+ Field level2Field = level1Field.getChildren().get(1);
+ assertThat(level2Field.getName()).isEqualTo("level2");
+ assertThat(level2Field.getType()).isEqualTo(ArrowType.Struct.INSTANCE);
+ assertThat(level2Field.getChildren()).hasSize(1);
+
+ // Verify value field in level2
+ Field valueField = level2Field.getChildren().get(0);
+ assertThat(valueField.getName()).isEqualTo("value");
+
assertThat(valueField.getType()).isInstanceOf(ArrowType.FloatingPoint.class);
+ }
+
+ @Test
+ void testToArrowSchemaWithArrayOfRowType() {
+ // Create a RowType with an array of rows
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT()),
+ DataTypes.FIELD(
+ "items",
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ DataTypes.FIELD("name",
DataTypes.STRING()),
+ DataTypes.FIELD("price",
DataTypes.DOUBLE())))));
+
+ Schema arrowSchema = LanceArrowUtils.toArrowSchema(rowType);
+
+ // Verify the schema structure
+ assertThat(arrowSchema.getFields()).hasSize(2);
+
+ // Verify items array field
+ Field itemsField = arrowSchema.getFields().get(1);
+ assertThat(itemsField.getName()).isEqualTo("items");
+ assertThat(itemsField.getType()).isEqualTo(ArrowType.List.INSTANCE);
+
+ // Verify the element type is a struct
+ List<Field> itemsChildren = itemsField.getChildren();
+ assertThat(itemsChildren).hasSize(1);
+
+ Field elementField = itemsChildren.get(0);
+ assertThat(elementField.getName()).isEqualTo("element");
+
assertThat(elementField.getType()).isEqualTo(ArrowType.Struct.INSTANCE);
+
+ // Verify the struct fields
+ List<Field> structChildren = elementField.getChildren();
+ assertThat(structChildren).hasSize(2);
+
+ Field nameField = structChildren.get(0);
+ assertThat(nameField.getName()).isEqualTo("name");
+ assertThat(nameField.getType()).isEqualTo(ArrowType.Utf8.INSTANCE);
+
+ Field priceField = structChildren.get(1);
+ assertThat(priceField.getName()).isEqualTo("price");
+
assertThat(priceField.getType()).isInstanceOf(ArrowType.FloatingPoint.class);
+ }
+
+ @Test
+ void testToArrowSchemaWithRowContainingArray() {
+ // Create a RowType with a nested row that contains an array
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT()),
+ DataTypes.FIELD(
+ "data",
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "tags",
DataTypes.ARRAY(DataTypes.STRING())),
+ DataTypes.FIELD("count",
DataTypes.INT()))));
+
+ Schema arrowSchema = LanceArrowUtils.toArrowSchema(rowType);
+
+ // Verify the schema structure
+ assertThat(arrowSchema.getFields()).hasSize(2);
+
+ // Verify data struct field
+ Field dataField = arrowSchema.getFields().get(1);
+ assertThat(dataField.getName()).isEqualTo("data");
+ assertThat(dataField.getType()).isEqualTo(ArrowType.Struct.INSTANCE);
+
+ // Verify tags array field within the struct
+ List<Field> dataChildren = dataField.getChildren();
+ assertThat(dataChildren).hasSize(2);
+
+ Field tagsField = dataChildren.get(0);
+ assertThat(tagsField.getName()).isEqualTo("tags");
+ assertThat(tagsField.getType()).isEqualTo(ArrowType.List.INSTANCE);
+
+ // Verify the array element type
+ List<Field> tagsChildren = tagsField.getChildren();
+ assertThat(tagsChildren).hasSize(1);
+ assertThat(tagsChildren.get(0).getName()).isEqualTo("element");
+
assertThat(tagsChildren.get(0).getType()).isEqualTo(ArrowType.Utf8.INSTANCE);
+ }
}