This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new aabe78a16 [iceberg] Use DataFileSet / DeleteFileSet instead of normal
collections (#2155)
aabe78a16 is described below
commit aabe78a16b43e9be19bbb3b65bc548b70edfb0f4
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Dec 12 02:34:05 2025 +0100
[iceberg] Use DataFileSet / DeleteFileSet instead of normal collections
(#2155)
---
.../maintenance/IcebergRewriteDataFiles.java | 10 +++++-----
.../iceberg/maintenance/RewriteDataFileResult.java | 13 ++++++------
.../lake/iceberg/tiering/IcebergCommittable.java | 23 +++++++++++-----------
.../lake/iceberg/tiering/IcebergLakeCommitter.java | 11 +++--------
.../iceberg/maintenance/IcebergRewriteTest.java | 5 ++---
.../tiering/IcebergWriteResultSerializerTest.java | 5 +++--
6 files changed, 30 insertions(+), 37 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
index 4f93bf983..570c05603 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
@@ -24,7 +24,6 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.ContentScanTask;
-import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -35,6 +34,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.util.BinPacking;
+import org.apache.iceberg.util.DataFileSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,8 +153,8 @@ public class IcebergRewriteDataFiles {
return null;
}
LOG.info("Start to rewrite files {}.", tasksToRewrite);
- List<DataFile> deletedDataFiles = new ArrayList<>();
- List<DataFile> addedDataFiles = new ArrayList<>();
+ DataFileSet deletedDataFiles = DataFileSet.create();
+ DataFileSet addedDataFiles = DataFileSet.create();
for (CombinedScanTask combinedScanTask : tasksToRewrite) {
addedDataFiles.addAll(rewriteFileGroup(combinedScanTask));
deletedDataFiles.addAll(
@@ -172,7 +172,7 @@ public class IcebergRewriteDataFiles {
}
}
- private List<DataFile> rewriteFileGroup(CombinedScanTask combinedScanTask)
throws IOException {
+ private DataFileSet rewriteFileGroup(CombinedScanTask combinedScanTask)
throws IOException {
try (CloseableIterable<Record> records =
readDataFile(combinedScanTask);
TaskWriter<Record> taskWriter =
TaskWriterFactory.createTaskWriter(table, partition,
bucket.getBucket())) {
@@ -184,7 +184,7 @@ public class IcebergRewriteDataFiles {
rewriteResult.deleteFiles().length == 0,
"the delete files should be empty, but got "
+ Arrays.toString(rewriteResult.deleteFiles()));
- return Arrays.asList(rewriteResult.dataFiles());
+ return DataFileSet.of(Arrays.asList(rewriteResult.dataFiles()));
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
index 79e0dfe21..65f5d5b16 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
@@ -18,10 +18,9 @@
package org.apache.fluss.lake.iceberg.maintenance;
-import org.apache.iceberg.DataFile;
+import org.apache.iceberg.util.DataFileSet;
import java.io.Serializable;
-import java.util.List;
/** The result for rewrite iceberg data files. */
public class RewriteDataFileResult implements Serializable {
@@ -29,21 +28,21 @@ public class RewriteDataFileResult implements Serializable {
private static final long serialVersionUID = 1L;
private final long snapshotId;
- private final List<DataFile> deletedDataFiles;
- private final List<DataFile> addedDataFiles;
+ private final DataFileSet deletedDataFiles;
+ private final DataFileSet addedDataFiles;
public RewriteDataFileResult(
- long snapshotId, List<DataFile> deletedDataFiles, List<DataFile>
addedDataFiles) {
+ long snapshotId, DataFileSet deletedDataFiles, DataFileSet
addedDataFiles) {
this.snapshotId = snapshotId;
this.deletedDataFiles = deletedDataFiles;
this.addedDataFiles = addedDataFiles;
}
- public List<DataFile> deletedDataFiles() {
+ public DataFileSet deletedDataFiles() {
return deletedDataFiles;
}
- public List<DataFile> addedDataFiles() {
+ public DataFileSet addedDataFiles() {
return addedDataFiles;
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java
index f476f0ef3..503da4b21 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java
@@ -21,6 +21,8 @@ import
org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.util.DataFileSet;
+import org.apache.iceberg.util.DeleteFileSet;
import java.io.Serializable;
import java.util.ArrayList;
@@ -31,25 +33,25 @@ public class IcebergCommittable implements Serializable {
private static final long serialVersionUID = 1L;
- private final List<DataFile> dataFiles;
- private final List<DeleteFile> deleteFiles;
+ private final DataFileSet dataFiles;
+ private final DeleteFileSet deleteFiles;
private final List<RewriteDataFileResult> rewriteDataFiles;
private IcebergCommittable(
- List<DataFile> dataFiles,
- List<DeleteFile> deleteFiles,
+ DataFileSet dataFiles,
+ DeleteFileSet deleteFiles,
List<RewriteDataFileResult> rewriteDataFiles) {
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
this.rewriteDataFiles = rewriteDataFiles;
}
- public List<DataFile> getDataFiles() {
+ public DataFileSet getDataFiles() {
return dataFiles;
}
- public List<DeleteFile> getDeleteFiles() {
+ public DeleteFileSet getDeleteFiles() {
return deleteFiles;
}
@@ -66,8 +68,8 @@ public class IcebergCommittable implements Serializable {
* entries.
*/
public static class Builder {
- private final List<DataFile> dataFiles = new ArrayList<>();
- private final List<DeleteFile> deleteFiles = new ArrayList<>();
+ private final DataFileSet dataFiles = DataFileSet.create();
+ private final DeleteFileSet deleteFiles = DeleteFileSet.create();
private final List<RewriteDataFileResult> rewriteDataFileResults = new
ArrayList<>();
@@ -87,10 +89,7 @@ public class IcebergCommittable implements Serializable {
}
public IcebergCommittable build() {
- return new IcebergCommittable(
- new ArrayList<>(dataFiles),
- new ArrayList<>(deleteFiles),
- rewriteDataFileResults);
+ return new IcebergCommittable(dataFiles, deleteFiles,
rewriteDataFileResults);
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index ec36cb4b9..0444244a7 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -48,7 +48,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -115,9 +114,7 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
if (committable.getDeleteFiles().isEmpty()) {
// Simple append-only case: only data files, no delete files
or compaction
AppendFiles appendFiles = icebergTable.newAppend();
- for (DataFile dataFile : committable.getDataFiles()) {
- appendFiles.appendFile(dataFile);
- }
+ committable.getDataFiles().forEach(appendFiles::appendFile);
snapshotUpdate = appendFiles;
} else {
/*
@@ -131,10 +128,8 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
being committed.
*/
RowDelta rowDelta = icebergTable.newRowDelta();
-
Arrays.stream(committable.getDataFiles().stream().toArray(DataFile[]::new))
- .forEach(rowDelta::addRows);
-
Arrays.stream(committable.getDeleteFiles().stream().toArray(DeleteFile[]::new))
- .forEach(rowDelta::addDeletes);
+ committable.getDataFiles().forEach(rowDelta::addRows);
+ committable.getDeleteFiles().forEach(rowDelta::addDeletes);
snapshotUpdate = rowDelta;
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteTest.java
index 264588bee..8d61e2533 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteTest.java
@@ -41,14 +41,13 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DataFileSet;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
@@ -232,7 +231,7 @@ class IcebergRewriteTest {
private static void appendTinyFilesWithRowsAndBucket(
Table table, int files, int rowsPerFile, int baseOffset, int
bucket) throws Exception {
- List<DataFile> toAppend = new ArrayList<>(files);
+ DataFileSet toAppend = DataFileSet.create();
for (int i = 0; i < files; i++) {
toAppend.add(
writeTinyDataFile(table, rowsPerFile, baseOffset + (i *
rowsPerFile), bucket));
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
index d44125857..b6d8d32e6 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DataFileSet;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -80,8 +81,8 @@ class IcebergWriteResultSerializerTest {
RewriteDataFileResult rewriteDataFileResult =
new RewriteDataFileResult(
1L,
- Collections.singletonList(dataFile),
- Collections.singletonList(dataFile));
+ DataFileSet.of(Collections.singletonList(dataFile)),
+ DataFileSet.of(Collections.singletonList(dataFile)));
originalResult = new IcebergWriteResult(writeResult,
rewriteDataFileResult);
serializedData = serializer.serialize(originalResult);
deserializedResult = serializer.deserialize(serializer.getVersion(),
serializedData);