Copilot commented on code in PR #2537:
URL: https://github.com/apache/fluss/pull/2537#discussion_r2750746497


##########
fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java:
##########
@@ -322,4 +325,140 @@ private Schema createTable(LanceConfig config) {
 
         return schema;
     }
+
+    @Test
+    void testTieringWriteTableWithArrayType() throws Exception {
+        int bucketNum = 2;
+        TablePath tablePath = TablePath.of("lance", "arrayTable");
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("lance.batch_size", "256");
+        LanceConfig config =
+                LanceConfig.from(
+                        configuration.toMap(),
+                        customProperties,
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+
+        List<Schema.Column> columns = new ArrayList<>();
+        columns.add(new Schema.Column("id", DataTypes.INT()));
+        columns.add(new Schema.Column("name", DataTypes.STRING()));
+        columns.add(new Schema.Column("tags", 
DataTypes.ARRAY(DataTypes.STRING())));
+        columns.add(new Schema.Column("scores", 
DataTypes.ARRAY(DataTypes.INT())));
+        Schema.Builder schemaBuilder = 
Schema.newBuilder().fromColumns(columns);
+        Schema schema = schemaBuilder.build();
+
+        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+        LanceDatasetAdapter.createDataset(
+                config.getDatasetUri(), 
LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
+
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .distributedBy(bucketNum)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .customProperties(customProperties)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
+
+        List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
+        SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
+                lanceLakeTieringFactory.getWriteResultSerializer();
+        SimpleVersionedSerializer<LanceCommittable> committableSerializer =
+                lanceLakeTieringFactory.getCommittableSerializer();
+
+        Map<Integer, List<LogRecord>> recordsByBucket = new HashMap<>();
+
+        for (int bucket = 0; bucket < bucketNum; bucket++) {
+            try (LakeWriter<LanceWriteResult> lakeWriter =
+                    createLakeWriter(tablePath, bucket, null, tableInfo)) {
+                List<LogRecord> writtenRecords = genArrayTableRecords(bucket, 
5);
+                recordsByBucket.put(bucket, writtenRecords);
+                for (LogRecord logRecord : writtenRecords) {
+                    lakeWriter.write(logRecord);
+                }
+                LanceWriteResult lanceWriteResult = lakeWriter.complete();
+                byte[] serialized = 
writeResultSerializer.serialize(lanceWriteResult);
+                lanceWriteResults.add(
+                        writeResultSerializer.deserialize(
+                                writeResultSerializer.getVersion(), 
serialized));
+            }
+        }
+
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath, tableInfo)) {
+            LanceCommittable lanceCommittable = 
lakeCommitter.toCommittable(lanceWriteResults);
+            byte[] serialized = 
committableSerializer.serialize(lanceCommittable);
+            lanceCommittable =
+                    committableSerializer.deserialize(
+                            committableSerializer.getVersion(), serialized);
+            Map<String, String> snapshotProperties =
+                    
Collections.singletonMap(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, "offsets");
+            long snapshot = lakeCommitter.commit(lanceCommittable, 
snapshotProperties);
+            assertThat(snapshot).isEqualTo(2);
+        }
+
+        try (Dataset dataset =
+                Dataset.open(
+                        new RootAllocator(),
+                        config.getDatasetUri(),
+                        LanceConfig.genReadOptionFromConfig(config))) {
+            ArrowReader reader = dataset.newScan().scanBatches();
+            VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+
+            for (int bucket = 0; bucket < bucketNum; bucket++) {
+                reader.loadNextBatch();
+                List<LogRecord> expectRecords = recordsByBucket.get(bucket);
+                verifyArrayTableRecords(readerRoot, expectRecords);
+            }
+            assertThat(reader.loadNextBatch()).isFalse();
+        }
+    }

Review Comment:
   The test only covers non-null arrays with values. Consider adding test cases 
for edge cases such as:
   1. Null array values (e.g., `genericRow.setField(2, null)`)
   2. Empty arrays (e.g., `new GenericArray(new BinaryString[0])`)
   3. Nested arrays (e.g., `DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))`)
   
   These edge cases are important to verify that the Arrow schema conversion 
and data copying logic handle all scenarios correctly.



##########
fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java:
##########
@@ -322,4 +325,140 @@ private Schema createTable(LanceConfig config) {
 
         return schema;
     }
+
+    @Test
+    void testTieringWriteTableWithArrayType() throws Exception {
+        int bucketNum = 2;
+        TablePath tablePath = TablePath.of("lance", "arrayTable");
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("lance.batch_size", "256");
+        LanceConfig config =
+                LanceConfig.from(
+                        configuration.toMap(),
+                        customProperties,
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+
+        List<Schema.Column> columns = new ArrayList<>();
+        columns.add(new Schema.Column("id", DataTypes.INT()));
+        columns.add(new Schema.Column("name", DataTypes.STRING()));
+        columns.add(new Schema.Column("tags", 
DataTypes.ARRAY(DataTypes.STRING())));
+        columns.add(new Schema.Column("scores", 
DataTypes.ARRAY(DataTypes.INT())));
+        Schema.Builder schemaBuilder = 
Schema.newBuilder().fromColumns(columns);
+        Schema schema = schemaBuilder.build();
+
+        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+        LanceDatasetAdapter.createDataset(
+                config.getDatasetUri(), 
LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
+
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .distributedBy(bucketNum)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .customProperties(customProperties)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
+
+        List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
+        SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
+                lanceLakeTieringFactory.getWriteResultSerializer();
+        SimpleVersionedSerializer<LanceCommittable> committableSerializer =
+                lanceLakeTieringFactory.getCommittableSerializer();
+
+        Map<Integer, List<LogRecord>> recordsByBucket = new HashMap<>();
+
+        for (int bucket = 0; bucket < bucketNum; bucket++) {
+            try (LakeWriter<LanceWriteResult> lakeWriter =
+                    createLakeWriter(tablePath, bucket, null, tableInfo)) {
+                List<LogRecord> writtenRecords = genArrayTableRecords(bucket, 
5);
+                recordsByBucket.put(bucket, writtenRecords);
+                for (LogRecord logRecord : writtenRecords) {
+                    lakeWriter.write(logRecord);
+                }
+                LanceWriteResult lanceWriteResult = lakeWriter.complete();
+                byte[] serialized = 
writeResultSerializer.serialize(lanceWriteResult);
+                lanceWriteResults.add(
+                        writeResultSerializer.deserialize(
+                                writeResultSerializer.getVersion(), 
serialized));
+            }
+        }
+
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath, tableInfo)) {
+            LanceCommittable lanceCommittable = 
lakeCommitter.toCommittable(lanceWriteResults);
+            byte[] serialized = 
committableSerializer.serialize(lanceCommittable);
+            lanceCommittable =
+                    committableSerializer.deserialize(
+                            committableSerializer.getVersion(), serialized);
+            Map<String, String> snapshotProperties =
+                    
Collections.singletonMap(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, "offsets");
+            long snapshot = lakeCommitter.commit(lanceCommittable, 
snapshotProperties);
+            assertThat(snapshot).isEqualTo(2);
+        }
+
+        try (Dataset dataset =
+                Dataset.open(
+                        new RootAllocator(),
+                        config.getDatasetUri(),
+                        LanceConfig.genReadOptionFromConfig(config))) {
+            ArrowReader reader = dataset.newScan().scanBatches();
+            VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+
+            for (int bucket = 0; bucket < bucketNum; bucket++) {
+                reader.loadNextBatch();
+                List<LogRecord> expectRecords = recordsByBucket.get(bucket);
+                verifyArrayTableRecords(readerRoot, expectRecords);
+            }
+            assertThat(reader.loadNextBatch()).isFalse();
+        }
+    }
+
+    private List<LogRecord> genArrayTableRecords(int bucket, int numRecords) {
+        List<LogRecord> logRecords = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            GenericRow genericRow = new GenericRow(4);
+            genericRow.setField(0, i);
+            genericRow.setField(1, BinaryString.fromString("user_" + bucket + 
"_" + i));
+
+            String[] tagsArray = new String[] {"tag" + i, "tag" + (i + 1), 
"category" + bucket};
+            BinaryString[] binaryTags = new BinaryString[tagsArray.length];
+            for (int j = 0; j < tagsArray.length; j++) {
+                binaryTags[j] = BinaryString.fromString(tagsArray[j]);
+            }
+            GenericArray tags = new GenericArray(binaryTags);
+            genericRow.setField(2, tags);
+
+            Integer[] scoresArray = new Integer[] {i * 10, i * 20, i * 30};
+            GenericArray scores = new GenericArray(scoresArray);
+            genericRow.setField(3, scores);
+
+            LogRecord logRecord =
+                    new GenericRecord(
+                            i, System.currentTimeMillis(), 
ChangeType.APPEND_ONLY, genericRow);
+            logRecords.add(logRecord);
+        }
+        return logRecords;
+    }
+
+    private void verifyArrayTableRecords(VectorSchemaRoot root, 
List<LogRecord> expectRecords)
+            throws Exception {
+        assertThat(root.getRowCount()).isEqualTo(expectRecords.size());
+        for (int i = 0; i < expectRecords.size(); i++) {
+            LogRecord expectRecord = expectRecords.get(i);
+
+            assertThat((int) (root.getVector(0).getObject(i)))
+                    .isEqualTo(expectRecord.getRow().getInt(0));
+            assertThat(((VarCharVector) 
root.getVector(1)).getObject(i).toString())
+                    .isEqualTo(expectRecord.getRow().getString(1).toString());
+
+            ListVector tagsVector = (ListVector) root.getVector(2);
+            Object tagsObject = tagsVector.getObject(i);
+            assertThat(tagsObject).isNotNull();
+
+            ListVector scoresVector = (ListVector) root.getVector(3);
+            Object scoresObject = scoresVector.getObject(i);
+            assertThat(scoresObject).isNotNull();

Review Comment:
   The test verification only checks that array objects are not null, but 
doesn't verify the actual array contents. This means the test would pass even 
if the arrays are empty or contain incorrect values. The verification should 
compare the actual array elements against the expected values from the 
generated records. For example, for the tags array, you should extract the list 
of strings and compare them with the expected BinaryString values from the 
expectRecord.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to