This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new dbbfc79c0 [lake/iceberg] Introduce IcebergRewriteDataFiles to compact
files (#1552)
dbbfc79c0 is described below
commit dbbfc79c0a8402a7354878af933eaa99ecb90008
Author: MehulBatra <[email protected]>
AuthorDate: Wed Aug 27 11:41:01 2025 +0530
[lake/iceberg] Introduce IcebergRewriteDataFiles to compact files (#1552)
---------
Co-authored-by: luoyuxia <[email protected]>
---
fluss-lake/fluss-lake-iceberg/pom.xml | 1 -
.../maintenance/IcebergRewriteDataFiles.java | 182 ++++++++++++++
.../iceberg/maintenance/RewriteDataFileResult.java | 56 +++++
.../lake/iceberg/tiering/IcebergCommittable.java | 32 ++-
.../lake/iceberg/tiering/IcebergLakeCommitter.java | 102 ++++++--
.../lake/iceberg/tiering/IcebergLakeWriter.java | 133 +++++++---
.../lake/iceberg/tiering/IcebergWriteResult.java | 20 +-
.../tiering/IcebergWriteResultSerializer.java | 59 ++++-
.../tiering/writer/AppendOnlyTaskWriter.java | 29 +--
.../iceberg/tiering/writer/DeltaTaskWriter.java | 49 +---
.../writer/GenericRecordAppendOnlyWriter.java | 14 +-
.../tiering/writer/GenericRecordDeltaWriter.java | 12 +-
.../iceberg/tiering/writer/TaskWriterFactory.java | 111 +++++++++
.../lake/iceberg/utils/IcebergConversions.java | 27 +++
.../apache/iceberg/data/IcebergGenericReader.java | 33 +++
.../iceberg/maintenance/IcebergRewriteTest.java | 268 +++++++++++++++++++++
.../tiering/IcebergWriteResultSerializerTest.java | 109 +++++++++
17 files changed, 1073 insertions(+), 164 deletions(-)
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml
b/fluss-lake/fluss-lake-iceberg/pom.xml
index 5ad616def..f80cec66b 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -31,7 +31,6 @@
<name>Fluss : Lake : Iceberg</name>
<packaging>jar</packaging>
-
<dependencies>
<dependency>
<groupId>org.apache.iceberg</groupId>
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
new file mode 100644
index 000000000..a74507655
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.maintenance;
+
+import org.apache.fluss.lake.iceberg.tiering.writer.TaskWriterFactory;
+import org.apache.fluss.metadata.TableBucket;
+
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.IcebergGenericReader;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.util.BinPacking;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.fluss.lake.iceberg.utils.IcebergConversions.toFilterExpression;
+import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Concrete implementation for Fluss's Iceberg integration. Handles
bin-packing compaction of small
+ * files into larger ones.
+ */
+public class IcebergRewriteDataFiles {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergRewriteDataFiles.class);
+
+ private static final int MIN_FILES_TO_COMPACT = 3;
+
+ private final Table table;
+ private final String partition;
+ private final TableBucket bucket;
+ private final Expression filter;
+ private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default
+
+ public IcebergRewriteDataFiles(Table table, @Nullable String partition,
TableBucket bucket) {
+ this.table = table;
+ this.partition = partition;
+ this.bucket = bucket;
+ this.filter = toFilterExpression(table, partition, bucket.getBucket());
+ }
+
+ public IcebergRewriteDataFiles targetSizeInBytes(long targetSize) {
+ this.targetSizeInBytes = targetSize;
+ return this;
+ }
+
+ private List<CombinedScanTask> planRewriteFileGroups() throws IOException {
+ List<FileScanTask> fileScanTasks = new ArrayList<>();
+ try (CloseableIterable<FileScanTask> tasks =
+
table.newScan().includeColumnStats().filter(filter).ignoreResiduals().planFiles())
{
+ tasks.forEach(fileScanTasks::add);
+ }
+
+ // the files < targetSizeInBytes is less than MIN_FILES_TO_COMPACT,
don't compact
+ if (fileScanTasks.stream()
+ .filter(fileScanTask -> fileScanTask.length() <
targetSizeInBytes)
+ .count()
+ < MIN_FILES_TO_COMPACT) {
+ // return empty file group
+ return Collections.emptyList();
+ }
+
+ // then, pack the fileScanTasks into compaction units which contains
compactable
+ // fileScanTasks, after compaction, we want to it still keep order by
__offset column,
+ // so, let's first sort by __offset column
+ int offsetFieldId =
table.schema().findField(OFFSET_COLUMN_NAME).fieldId();
+ fileScanTasks.sort(sortFileScanTask(offsetFieldId));
+
+ // do package now
+ BinPacking.ListPacker<FileScanTask> packer =
+ new BinPacking.ListPacker<>(targetSizeInBytes, 1, false);
+ return packer.pack(fileScanTasks, ContentScanTask::length).stream()
+ .filter(tasks -> tasks.size() > 1)
+ .map(BaseCombinedScanTask::new)
+ .collect(Collectors.toList());
+ }
+
+ private Comparator<FileScanTask> sortFileScanTask(int sortFiledId) {
+ return (f1, f2) -> {
+ ByteBuffer buffer1 =
+ f1.file()
+ .lowerBounds()
+ .get(sortFiledId)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .rewind();
+ long offset1 = buffer1.getLong();
+ ByteBuffer buffer2 =
+ f2.file()
+ .lowerBounds()
+ .get(sortFiledId)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .rewind();
+ long offset2 = buffer2.getLong();
+ return Long.compare(offset1, offset2);
+ };
+ }
+
+ @Nullable
+ public RewriteDataFileResult execute() {
+ try {
+ // plan the file groups to be rewrite
+ List<CombinedScanTask> tasksToRewrite = planRewriteFileGroups();
+ if (tasksToRewrite.isEmpty()) {
+ return null;
+ }
+ LOG.info("Start to rewrite files {}.", tasksToRewrite);
+ List<DataFile> deletedDataFiles = new ArrayList<>();
+ List<DataFile> addedDataFiles = new ArrayList<>();
+ for (CombinedScanTask combinedScanTask : tasksToRewrite) {
+ addedDataFiles.addAll(rewriteFileGroup(combinedScanTask));
+ deletedDataFiles.addAll(
+ combinedScanTask.files().stream()
+ .map(ContentScanTask::file)
+ .collect(Collectors.toList()));
+ }
+ LOG.info("Finish rewriting files from {} to {}.",
deletedDataFiles, addedDataFiles);
+ return new RewriteDataFileResult(deletedDataFiles, addedDataFiles);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Fail to compact bucket %s of table %s.",
bucket, table.name()),
+ e);
+ }
+ }
+
+ private List<DataFile> rewriteFileGroup(CombinedScanTask combinedScanTask)
throws IOException {
+ try (CloseableIterable<Record> records =
readDataFile(combinedScanTask);
+ TaskWriter<Record> taskWriter =
+ TaskWriterFactory.createTaskWriter(table, partition,
bucket.getBucket())) {
+ for (Record record : records) {
+ taskWriter.write(record);
+ }
+ WriteResult rewriteResult = taskWriter.complete();
+ checkState(
+ rewriteResult.deleteFiles().length == 0,
+ "the delete files should be empty, but got "
+ + Arrays.toString(rewriteResult.deleteFiles()));
+ return Arrays.asList(rewriteResult.dataFiles());
+ }
+ }
+
+ private CloseableIterable<Record> readDataFile(CombinedScanTask
combinedScanTask) {
+ IcebergGenericReader reader = new
IcebergGenericReader(table.newScan(), true);
+ return reader.open(combinedScanTask);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
new file mode 100644
index 000000000..e24580cab
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.maintenance;
+
+import org.apache.iceberg.DataFile;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** The result for rewrite iceberg data files. */
+public class RewriteDataFileResult implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final List<DataFile> deletedDataFiles;
+ private final List<DataFile> addedDataFiles;
+
+ public RewriteDataFileResult(List<DataFile> deletedDataFiles,
List<DataFile> addedDataFiles) {
+ this.deletedDataFiles = deletedDataFiles;
+ this.addedDataFiles = addedDataFiles;
+ }
+
+ public List<DataFile> deletedDataFiles() {
+ return deletedDataFiles;
+ }
+
+ public List<DataFile> addedDataFiles() {
+ return addedDataFiles;
+ }
+
+ @Override
+ public String toString() {
+ return "RewriteDataFileResult{"
+ + "deletedDataFiles="
+ + deletedDataFiles
+ + ", addedDataFiles="
+ + addedDataFiles
+ + '}';
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java
index 1019eab45..f476f0ef3 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java
@@ -17,6 +17,8 @@
package org.apache.fluss.lake.iceberg.tiering;
+import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
+
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
@@ -32,9 +34,15 @@ public class IcebergCommittable implements Serializable {
private final List<DataFile> dataFiles;
private final List<DeleteFile> deleteFiles;
- private IcebergCommittable(List<DataFile> dataFiles, List<DeleteFile>
deleteFiles) {
+ private final List<RewriteDataFileResult> rewriteDataFiles;
+
+ private IcebergCommittable(
+ List<DataFile> dataFiles,
+ List<DeleteFile> deleteFiles,
+ List<RewriteDataFileResult> rewriteDataFiles) {
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
+ this.rewriteDataFiles = rewriteDataFiles;
}
public List<DataFile> getDataFiles() {
@@ -45,6 +53,10 @@ public class IcebergCommittable implements Serializable {
return deleteFiles;
}
+ public List<RewriteDataFileResult> rewriteDataFileResults() {
+ return rewriteDataFiles;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -57,6 +69,8 @@ public class IcebergCommittable implements Serializable {
private final List<DataFile> dataFiles = new ArrayList<>();
private final List<DeleteFile> deleteFiles = new ArrayList<>();
+ private final List<RewriteDataFileResult> rewriteDataFileResults = new
ArrayList<>();
+
public Builder addDataFile(DataFile dataFile) {
this.dataFiles.add(dataFile);
return this;
@@ -67,8 +81,16 @@ public class IcebergCommittable implements Serializable {
return this;
}
+ public Builder addRewriteDataFileResult(RewriteDataFileResult
rewriteDataFileResult) {
+ this.rewriteDataFileResults.add(rewriteDataFileResult);
+ return this;
+ }
+
public IcebergCommittable build() {
- return new IcebergCommittable(new ArrayList<>(dataFiles), new
ArrayList<>(deleteFiles));
+ return new IcebergCommittable(
+ new ArrayList<>(dataFiles),
+ new ArrayList<>(deleteFiles),
+ rewriteDataFileResults);
}
}
@@ -76,9 +98,11 @@ public class IcebergCommittable implements Serializable {
public String toString() {
return "IcebergCommittable{"
+ "dataFiles="
- + dataFiles.size()
+ + dataFiles
+ ", deleteFiles="
- + deleteFiles.size()
+ + deleteFiles
+ + "rewriteDataFiles = "
+ + rewriteDataFiles
+ '}';
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index b87081ad5..604f6cc6b 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.iceberg.tiering;
import org.apache.fluss.lake.committer.BucketOffset;
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
import org.apache.fluss.metadata.TablePath;
import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -29,6 +30,7 @@ import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
@@ -39,6 +41,8 @@ import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.events.Listener;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.io.WriteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -56,6 +60,9 @@ import static
org.apache.fluss.utils.Preconditions.checkNotNull;
/** Implementation of {@link LakeCommitter} for Iceberg. */
public class IcebergLakeCommitter implements LakeCommitter<IcebergWriteResult,
IcebergCommittable> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergLakeCommitter.class);
+
private static final String COMMITTER_USER = "commit-user";
private final Catalog icebergCatalog;
@@ -87,6 +94,11 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
for (DeleteFile deleteFile : writeResult.deleteFiles()) {
builder.addDeleteFile(deleteFile);
}
+
+ RewriteDataFileResult rewriteDataFileResult =
result.rewriteDataFileResult();
+ if (rewriteDataFileResult != null) {
+ builder.addRewriteDataFileResult(rewriteDataFileResult);
+ }
}
return builder.build();
@@ -99,52 +111,92 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
// Refresh table to get latest metadata
icebergTable.refresh();
+ SnapshotUpdate<?> snapshotUpdate;
if (committable.getDeleteFiles().isEmpty()) {
// Simple append-only case: only data files, no delete files
or compaction
AppendFiles appendFiles = icebergTable.newAppend();
for (DataFile dataFile : committable.getDataFiles()) {
appendFiles.appendFile(dataFile);
}
-
- addFlussProperties(appendFiles, snapshotProperties);
-
- appendFiles.commit();
+ snapshotUpdate = appendFiles;
} else {
- /**
- * Row delta validations are not needed for streaming changes
that write equality
- * deletes. Equality deletes are applied to data in all
previous sequence numbers,
- * so retries may push deletes further in the future, but do
not affect correctness.
- * Position deletes committed to the table in this path are
used only to delete rows
- * from data files that are being added in this commit. There
is no way for data
- * files added along with the delete files to be concurrently
removed, so there is
- * no need to validate the files referenced by the position
delete files that are
- * being committed.
- */
+ /*
+ Row delta validations are not needed for streaming changes
that write equality
+ deletes. Equality deletes are applied to data in all previous
sequence numbers,
+ so retries may push deletes further in the future, but do not
affect correctness.
+ Position deletes committed to the table in this path are used
only to delete rows
+ from data files that are being added in this commit. There is
no way for data
+ files added along with the delete files to be concurrently
removed, so there is
+ no need to validate the files referenced by the position
delete files that are
+ being committed.
+ */
RowDelta rowDelta = icebergTable.newRowDelta();
Arrays.stream(committable.getDataFiles().stream().toArray(DataFile[]::new))
.forEach(rowDelta::addRows);
Arrays.stream(committable.getDeleteFiles().stream().toArray(DeleteFile[]::new))
.forEach(rowDelta::addDeletes);
- snapshotProperties.forEach(rowDelta::set);
- rowDelta.commit();
+ snapshotUpdate = rowDelta;
}
- Long commitSnapshotId = currentCommitSnapshotId.get();
- currentCommitSnapshotId.remove();
-
- return checkNotNull(
- commitSnapshotId, "Iceberg committed snapshot id must be
non-null.");
+ // commit written files
+ long snapshotId = commit(snapshotUpdate, snapshotProperties);
+
+ // There exists rewrite files, commit rewrite files
+ List<RewriteDataFileResult> rewriteDataFileResults =
+ committable.rewriteDataFileResults();
+ if (!rewriteDataFileResults.isEmpty()) {
+ Long rewriteCommitSnapshotId =
+ commitRewrite(rewriteDataFileResults,
snapshotProperties);
+ if (rewriteCommitSnapshotId != null) {
+ snapshotId = rewriteCommitSnapshotId;
+ }
+ }
+ return checkNotNull(snapshotId, "Iceberg committed snapshot id
must be non-null.");
} catch (Exception e) {
throw new IOException("Failed to commit to Iceberg table.", e);
}
}
- private void addFlussProperties(
- SnapshotUpdate<?> operation, Map<String, String>
snapshotProperties) {
- operation.set(COMMITTER_USER, FLUSS_LAKE_TIERING_COMMIT_USER);
+ private Long commitRewrite(
+ List<RewriteDataFileResult> rewriteDataFileResults,
+ Map<String, String> snapshotProperties) {
+ icebergTable.refresh();
+ RewriteFiles rewriteFiles = icebergTable.newRewrite();
+ for (RewriteDataFileResult rewriteDataFileResult :
rewriteDataFileResults) {
+
rewriteDataFileResult.addedDataFiles().forEach(rewriteFiles::addFile);
+
rewriteDataFileResult.deletedDataFiles().forEach(rewriteFiles::deleteFile);
+ }
+ try {
+ return commit(rewriteFiles, snapshotProperties);
+ } catch (Exception e) {
+ List<String> rewriteAddedDataFiles =
+ rewriteDataFileResults.stream()
+ .flatMap(
+ rewriteDataFileResult ->
+
rewriteDataFileResult.addedDataFiles().stream())
+ .map(dataFile -> dataFile.path().toString())
+ .collect(Collectors.toList());
+ LOG.error(
+ "Failed to commit rewrite files to iceberg, delete rewrite
added files {}.",
+ rewriteAddedDataFiles,
+ e);
+ // we need to abort new rewrite files
+ CatalogUtil.deleteFiles(icebergTable.io(), rewriteAddedDataFiles,
"data file", true);
+ return null;
+ }
+ }
+
+ private long commit(SnapshotUpdate<?> snapshotUpdate, Map<String, String>
snapshotProperties) {
+ // add snapshot properties
+ snapshotUpdate.set(COMMITTER_USER, FLUSS_LAKE_TIERING_COMMIT_USER);
for (Map.Entry<String, String> entry : snapshotProperties.entrySet()) {
- operation.set(entry.getKey(), entry.getValue());
+ snapshotUpdate.set(entry.getKey(), entry.getValue());
}
+ // do commit
+ snapshotUpdate.commit();
+ Long commitSnapshotId = currentCommitSnapshotId.get();
+ currentCommitSnapshotId.remove();
+ return commitSnapshotId;
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
index c5765b7d1..da9d78f71 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
@@ -17,37 +17,54 @@
package org.apache.fluss.lake.iceberg.tiering;
+import org.apache.fluss.lake.iceberg.maintenance.IcebergRewriteDataFiles;
+import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
import org.apache.fluss.lake.iceberg.tiering.writer.AppendOnlyTaskWriter;
import org.apache.fluss.lake.iceberg.tiering.writer.DeltaTaskWriter;
+import org.apache.fluss.lake.iceberg.tiering.writer.TaskWriterFactory;
import org.apache.fluss.lake.writer.LakeWriter;
import org.apache.fluss.lake.writer.WriterInitContext;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
/** Implementation of {@link LakeWriter} for Iceberg. */
public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
+ protected static final Logger LOG =
LoggerFactory.getLogger(IcebergLakeWriter.class);
+
+ private static final String AUTO_MAINTENANCE_KEY =
"table.datalake.auto-maintenance";
+
private final Catalog icebergCatalog;
private final Table icebergTable;
private final RecordWriter recordWriter;
+ private final boolean autoMaintenanceEnabled;
+
+ @Nullable private final ExecutorService compactionExecutor;
+ @Nullable private CompletableFuture<RewriteDataFileResult>
compactionFuture;
public IcebergLakeWriter(
IcebergCatalogProvider icebergCatalogProvider, WriterInitContext
writerInitContext)
@@ -55,33 +72,37 @@ public class IcebergLakeWriter implements
LakeWriter<IcebergWriteResult> {
this.icebergCatalog = icebergCatalogProvider.get();
this.icebergTable = getTable(writerInitContext.tablePath());
- // Create record writer based on table type
- // For now, only supporting non-partitioned append-only tables
+ // Check auto-maintenance from table properties
+ this.autoMaintenanceEnabled =
+ Boolean.parseBoolean(
+
icebergTable.properties().getOrDefault(AUTO_MAINTENANCE_KEY, "false"));
+
+ // Create a record writer
this.recordWriter = createRecordWriter(writerInitContext);
+
+ if (autoMaintenanceEnabled) {
+ this.compactionExecutor =
+ Executors.newSingleThreadExecutor(
+ new ExecutorThreadFactory(
+ "iceberg-compact-" +
writerInitContext.tableBucket()));
+ scheduleCompaction(writerInitContext);
+ } else {
+ this.compactionExecutor = null;
+ }
}
private RecordWriter createRecordWriter(WriterInitContext
writerInitContext) {
- Schema schema = icebergTable.schema();
- List<Integer> equalityFieldIds = new
ArrayList<>(schema.identifierFieldIds());
-
- // Get target file size from table properties
- long targetFileSize = targetFileSize(icebergTable);
- FileFormat format = fileFormat(icebergTable);
- OutputFileFactory outputFileFactory =
- OutputFileFactory.builderFor(
- icebergTable,
- writerInitContext.tableBucket().getBucket(),
- // task id always 0
- 0)
- .format(format)
- .build();
-
+ List<Integer> equalityFieldIds =
+ new ArrayList<>(icebergTable.schema().identifierFieldIds());
+ TaskWriter<Record> taskWriter =
+ TaskWriterFactory.createTaskWriter(
+ icebergTable,
+ writerInitContext.partition(),
+ writerInitContext.tableBucket().getBucket());
if (equalityFieldIds.isEmpty()) {
- return new AppendOnlyTaskWriter(
- icebergTable, writerInitContext, format,
outputFileFactory, targetFileSize);
+ return new AppendOnlyTaskWriter(icebergTable, writerInitContext,
taskWriter);
} else {
- return new DeltaTaskWriter(
- icebergTable, writerInitContext, format,
outputFileFactory, targetFileSize);
+ return new DeltaTaskWriter(icebergTable, writerInitContext,
taskWriter);
}
}
@@ -98,7 +119,12 @@ public class IcebergLakeWriter implements
LakeWriter<IcebergWriteResult> {
public IcebergWriteResult complete() throws IOException {
try {
WriteResult writeResult = recordWriter.complete();
- return new IcebergWriteResult(writeResult);
+
+ RewriteDataFileResult rewriteDataFileResult = null;
+ if (compactionFuture != null) {
+ rewriteDataFileResult = compactionFuture.get();
+ }
+ return new IcebergWriteResult(writeResult, rewriteDataFileResult);
} catch (Exception e) {
throw new IOException("Failed to complete Iceberg write.", e);
}
@@ -107,6 +133,15 @@ public class IcebergLakeWriter implements
LakeWriter<IcebergWriteResult> {
@Override
public void close() throws IOException {
try {
+ if (compactionFuture != null && !compactionFuture.isDone()) {
+ compactionFuture.cancel(true);
+ }
+
+ if (compactionExecutor != null
+ && !compactionExecutor.awaitTermination(30,
TimeUnit.SECONDS)) {
+ LOG.warn("Fail to close compactionExecutor.");
+ }
+
if (recordWriter != null) {
recordWriter.close();
}
@@ -126,19 +161,39 @@ public class IcebergLakeWriter implements
LakeWriter<IcebergWriteResult> {
}
}
- private static FileFormat fileFormat(Table icebergTable) {
- String formatString =
- PropertyUtil.propertyAsString(
- icebergTable.properties(),
- TableProperties.DEFAULT_FILE_FORMAT,
- TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
- return FileFormat.fromString(formatString);
+ private void scheduleCompaction(WriterInitContext writerInitContext) {
+ compactionFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ Table table = icebergTable;
+ IcebergRewriteDataFiles rewriter =
+ new IcebergRewriteDataFiles(
+ table,
+
writerInitContext.partition(),
+
writerInitContext.tableBucket())
+
.targetSizeInBytes(compactionTargetSize(table));
+ return rewriter.execute();
+ } catch (Exception e) {
+ LOG.info("Fail to compact iceberg data
files.", e);
+ // Swallow and return null to avoid failing
the write path
+ return null;
+ }
+ },
+ compactionExecutor);
}
- private static long targetFileSize(Table icebergTable) {
- return PropertyUtil.propertyAsLong(
- icebergTable.properties(),
- WRITE_TARGET_FILE_SIZE_BYTES,
- WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ private static long compactionTargetSize(Table icebergTable) {
+ long splitSize =
+ PropertyUtil.propertyAsLong(
+ icebergTable.properties(),
+ TableProperties.SPLIT_SIZE,
+ TableProperties.SPLIT_SIZE_DEFAULT);
+ long targetFileSize =
+ PropertyUtil.propertyAsLong(
+ icebergTable.properties(),
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ return Math.min(splitSize, targetFileSize);
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java
index cd2825ce5..2b36c87c2 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java
@@ -17,8 +17,12 @@
package org.apache.fluss.lake.iceberg.tiering;
+import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
+
import org.apache.iceberg.io.WriteResult;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
/** The write result of Iceberg lake writer to pass to commiter to commit. */
@@ -26,16 +30,27 @@ public class IcebergWriteResult implements Serializable {
private static final long serialVersionUID = 1L;
+ // the normal result of tiering writing to iceberg
private final WriteResult writeResult;
- public IcebergWriteResult(WriteResult writeResult) {
+ // the rewrite result
+ @Nullable private final RewriteDataFileResult rewriteDataFileResult;
+
+ public IcebergWriteResult(
+ WriteResult writeResult, @Nullable RewriteDataFileResult
rewriteDataFileResult) {
this.writeResult = writeResult;
+ this.rewriteDataFileResult = rewriteDataFileResult;
}
public WriteResult getWriteResult() {
return writeResult;
}
+ @Nullable
+ public RewriteDataFileResult rewriteDataFileResult() {
+ return rewriteDataFileResult;
+ }
+
@Override
public String toString() {
return "IcebergWriteResult{"
@@ -43,6 +58,9 @@ public class IcebergWriteResult implements Serializable {
+ writeResult.dataFiles().length
+ ", deleteFiles="
+ writeResult.deleteFiles().length
+ + (rewriteDataFileResult != null
+ ? (", rewriteDataFiles=" + rewriteDataFileResult)
+ : "")
+ '}';
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
index 26bb02a77..fe48e3af2 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
@@ -18,11 +18,16 @@
package org.apache.fluss.lake.iceberg.tiering;
+import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
import org.apache.fluss.utils.InstantiationUtils;
import org.apache.iceberg.io.WriteResult;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
/** Serializer for {@link IcebergWriteResult}. */
@@ -37,18 +42,64 @@ public class IcebergWriteResultSerializer implements
SimpleVersionedSerializer<I
@Override
public byte[] serialize(IcebergWriteResult icebergWriteResult) throws
IOException {
- return
InstantiationUtils.serializeObject(icebergWriteResult.getWriteResult());
+ byte[] writeResultBytes =
+
InstantiationUtils.serializeObject(icebergWriteResult.getWriteResult());
+
+ RewriteDataFileResult rewriteDataFileResult =
icebergWriteResult.rewriteDataFileResult();
+ byte[] rewriteResultBytes =
+ rewriteDataFileResult == null
+ ? null
+ :
InstantiationUtils.serializeObject(rewriteDataFileResult);
+
+ try (ByteArrayOutputStream baos = new
ByteArrayOutputStream(writeResultBytes.length);
+ DataOutputStream dos = new DataOutputStream(baos)) {
+ // Frame WriteResult
+ dos.writeInt(writeResultBytes.length);
+ dos.write(writeResultBytes);
+
+ // optional, rewrite
+ boolean hasRewrite = rewriteResultBytes != null;
+ dos.writeBoolean(hasRewrite);
+ if (hasRewrite) {
+ dos.writeInt(rewriteResultBytes.length);
+ dos.write(rewriteResultBytes);
+ }
+ return baos.toByteArray();
+ }
}
@Override
public IcebergWriteResult deserialize(int version, byte[] serialized)
throws IOException {
WriteResult writeResult;
- try {
+ RewriteDataFileResult rewriteDataFileResult;
+ try (DataInputStream dis = new DataInputStream(new
ByteArrayInputStream(serialized))) {
+ int wrLen = dis.readInt();
+ if (wrLen < 0 || wrLen > serialized.length) {
+ throw new IOException(
+ "Corrupted serialization: invalid WriteResult length "
+ wrLen);
+ }
+ byte[] wrBytes = new byte[wrLen];
+ dis.readFully(wrBytes);
writeResult =
- InstantiationUtils.deserializeObject(serialized,
getClass().getClassLoader());
+ InstantiationUtils.deserializeObject(wrBytes,
getClass().getClassLoader());
+
+ boolean hasCompaction = dis.readBoolean();
+ if (hasCompaction) {
+ int crLen = dis.readInt();
+ if (crLen < 0 || crLen > serialized.length) {
+ throw new IOException(
+ "Corrupted serialization: invalid compactionResult
length " + crLen);
+ }
+ byte[] crBytes = new byte[crLen];
+ dis.readFully(crBytes);
+ rewriteDataFileResult =
+ InstantiationUtils.deserializeObject(crBytes,
getClass().getClassLoader());
+ } else {
+ rewriteDataFileResult = null;
+ }
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
- return new IcebergWriteResult(writeResult);
+ return new IcebergWriteResult(writeResult, rewriteDataFileResult);
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
index 6f204ca30..53ccb3a80 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
@@ -21,12 +21,8 @@ import org.apache.fluss.lake.iceberg.tiering.RecordWriter;
import org.apache.fluss.lake.writer.WriterInitContext;
import org.apache.fluss.record.LogRecord;
-import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
-import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileAppenderFactory;
-import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
/** A {@link RecordWriter} to write to Iceberg's append-only table. */
@@ -35,35 +31,14 @@ public class AppendOnlyTaskWriter extends RecordWriter {
public AppendOnlyTaskWriter(
Table icebergTable,
WriterInitContext writerInitContext,
- FileFormat format,
- OutputFileFactory outputFileFactory,
- long targetFileSize) {
+ TaskWriter<Record> taskWriter) {
super(
- createTaskWriter(
- icebergTable, writerInitContext, format,
outputFileFactory, targetFileSize),
+ taskWriter,
icebergTable.schema(),
writerInitContext.schema().getRowType(),
writerInitContext.tableBucket());
}
- private static TaskWriter<Record> createTaskWriter(
- Table icebergTable,
- WriterInitContext writerInitContext,
- FileFormat format,
- OutputFileFactory outputFileFactory,
- long targetFileSize) {
- FileAppenderFactory<Record> fileAppenderFactory =
- new GenericAppenderFactory(icebergTable.schema(),
icebergTable.spec());
- return new GenericRecordAppendOnlyWriter(
- icebergTable,
- format,
- fileAppenderFactory,
- outputFileFactory,
- icebergTable.io(),
- targetFileSize,
- writerInitContext);
- }
-
@Override
public void write(LogRecord record) throws Exception {
flussRecordAsIcebergRecord.setFlussRecord(record);
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
index 677ae1e67..cbab8fdf9 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
@@ -21,69 +21,24 @@ import org.apache.fluss.lake.iceberg.tiering.RecordWriter;
import org.apache.fluss.lake.writer.WriterInitContext;
import org.apache.fluss.record.LogRecord;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileAppenderFactory;
-import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
-import java.util.ArrayList;
-import java.util.List;
-
/** A {@link RecordWriter} to write to Iceberg's primary-key table. */
public class DeltaTaskWriter extends RecordWriter {
public DeltaTaskWriter(
Table icebergTable,
WriterInitContext writerInitContext,
- FileFormat format,
- OutputFileFactory outputFileFactory,
- long targetFileSize) {
+ TaskWriter<Record> taskWriter) {
super(
- createTaskWriter(
- icebergTable, format, outputFileFactory,
targetFileSize, writerInitContext),
+ taskWriter,
icebergTable.schema(),
writerInitContext.schema().getRowType(),
writerInitContext.tableBucket());
}
- private static TaskWriter<Record> createTaskWriter(
- Table icebergTable,
- FileFormat format,
- OutputFileFactory outputFileFactory,
- long targetFileSize,
- WriterInitContext writerInitContext) {
- int[] equalityFieldIds =
- icebergTable.schema().identifierFieldIds().stream()
- .mapToInt(Integer::intValue)
- .toArray();
- FileAppenderFactory<Record> appenderFactory =
- new GenericAppenderFactory(
- icebergTable.schema(),
- icebergTable.spec(),
- equalityFieldIds,
- icebergTable.schema(),
- null);
-
- List<String> columns = new ArrayList<>();
- for (Integer fieldId : icebergTable.schema().identifierFieldIds()) {
- columns.add(icebergTable.schema().findField(fieldId).name());
- }
- Schema deleteSchema = icebergTable.schema().select(columns);
- return new GenericRecordDeltaWriter(
- icebergTable,
- deleteSchema,
- format,
- appenderFactory,
- outputFileFactory,
- icebergTable.io(),
- targetFileSize,
- writerInitContext);
- }
-
@Override
public void write(LogRecord record) throws Exception {
GenericRecordDeltaWriter deltaWriter = (GenericRecordDeltaWriter)
taskWriter;
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
index 8d5130304..0933ccb26 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
@@ -18,8 +18,6 @@
package org.apache.fluss.lake.iceberg.tiering.writer;
-import org.apache.fluss.lake.writer.WriterInitContext;
-
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
@@ -28,6 +26,8 @@ import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import static
org.apache.fluss.lake.iceberg.utils.IcebergConversions.toPartition;
@@ -44,14 +44,10 @@ public class GenericRecordAppendOnlyWriter extends
BaseTaskWriter<Record> {
OutputFileFactory fileFactory,
FileIO io,
long targetFileSize,
- WriterInitContext writerInitContext) {
+ @Nullable String partitionName,
+ int bucket) {
super(icebergTable.spec(), format, appenderFactory, fileFactory, io,
targetFileSize);
- currentWriter =
- new RollingFileWriter(
- toPartition(
- icebergTable,
- writerInitContext.partition(),
- writerInitContext.tableBucket().getBucket()));
+ currentWriter = new RollingFileWriter(toPartition(icebergTable,
partitionName, bucket));
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
index d327ef176..22abafb3a 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
@@ -17,8 +17,6 @@
package org.apache.fluss.lake.iceberg.tiering.writer;
-import org.apache.fluss.lake.writer.WriterInitContext;
-
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.Schema;
@@ -30,6 +28,8 @@ import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import static
org.apache.fluss.lake.iceberg.utils.IcebergConversions.toPartition;
@@ -46,14 +46,12 @@ class GenericRecordDeltaWriter extends
BaseTaskWriter<Record> {
OutputFileFactory fileFactory,
FileIO io,
long targetFileSize,
- WriterInitContext writerInitContext) {
+ @Nullable String partition,
+ int bucket) {
super(icebergTable.spec(), format, appenderFactory, fileFactory, io,
targetFileSize);
this.deltaWriter =
new GenericEqualityDeltaWriter(
- toPartition(
- icebergTable,
- writerInitContext.partition(),
- writerInitContext.tableBucket().getBucket()),
+ toPartition(icebergTable, partition, bucket),
icebergTable.schema(),
deleteSchema);
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/TaskWriterFactory.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/TaskWriterFactory.java
new file mode 100644
index 000000000..2e5cf1223
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/TaskWriterFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.tiering.writer;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.util.PropertyUtil;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+/** A factory to create Iceberg {@link TaskWriter}. */
+public class TaskWriterFactory {
+
+ public static TaskWriter<Record> createTaskWriter(
+ Table table, @Nullable String partition, int bucket) {
+ Schema schema = table.schema();
+ int[] equalityFieldIds =
+
schema.identifierFieldIds().stream().mapToInt(Integer::intValue).toArray();
+
+ // Get target file size from table properties
+ long targetFileSize = targetFileSize(table);
+ FileFormat format = fileFormat(table);
+ OutputFileFactory outputFileFactory =
+ OutputFileFactory.builderFor(
+ table,
+ bucket,
+ // task id always 0
+ 0)
+ .format(format)
+ .build();
+
+ if (equalityFieldIds.length == 0) {
+ FileAppenderFactory<Record> fileAppenderFactory =
+ new GenericAppenderFactory(schema, table.spec());
+ return new GenericRecordAppendOnlyWriter(
+ table,
+ format,
+ fileAppenderFactory,
+ outputFileFactory,
+ table.io(),
+ targetFileSize,
+ partition,
+ bucket);
+
+ } else {
+ FileAppenderFactory<Record> appenderFactory =
+ new GenericAppenderFactory(
+ schema, table.spec(), equalityFieldIds, schema,
null);
+ List<String> columns = new ArrayList<>();
+ for (Integer fieldId : schema.identifierFieldIds()) {
+ columns.add(schema.findField(fieldId).name());
+ }
+ Schema deleteSchema = schema.select(columns);
+ return new GenericRecordDeltaWriter(
+ table,
+ deleteSchema,
+ format,
+ appenderFactory,
+ outputFileFactory,
+ table.io(),
+ targetFileSize,
+ partition,
+ bucket);
+ }
+ }
+
+ private static FileFormat fileFormat(Table icebergTable) {
+ String formatString =
+ PropertyUtil.propertyAsString(
+ icebergTable.properties(),
+ TableProperties.DEFAULT_FILE_FORMAT,
+ TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ return FileFormat.fromString(formatString);
+ }
+
+ private static long targetFileSize(Table icebergTable) {
+ return PropertyUtil.propertyAsLong(
+ icebergTable.properties(),
+ WRITE_TARGET_FILE_SIZE_BYTES,
+ WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
index 2b53fe7a3..af20ff594 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -19,14 +19,19 @@ package org.apache.fluss.lake.iceberg.utils;
import org.apache.fluss.metadata.TablePath;
+import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import javax.annotation.Nullable;
+import java.util.List;
+
import static
org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
/** Utility class for static conversions between Fluss and Iceberg types. */
@@ -52,4 +57,26 @@ public class IcebergConversions {
partitionKey.set(pos, bucket);
return partitionKey;
}
+
+ public static Expression toFilterExpression(
+ Table table, @Nullable String partitionName, int bucket) {
+ List<PartitionField> partitionFields = table.spec().fields();
+ Expression expression = Expressions.alwaysTrue();
+ int partitionIndex = 0;
+ if (partitionName != null) {
+ String[] partitionArr = partitionName.split("\\" +
PARTITION_SPEC_SEPARATOR);
+ for (String partition : partitionArr) {
+ expression =
+ Expressions.and(
+ expression,
+ Expressions.equal(
+
partitionFields.get(partitionIndex++).name(), partition));
+ }
+ }
+ expression =
+ Expressions.and(
+ expression,
+
Expressions.equal(partitionFields.get(partitionIndex).name(), bucket));
+ return expression;
+ }
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/iceberg/data/IcebergGenericReader.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/iceberg/data/IcebergGenericReader.java
new file mode 100644
index 000000000..c496c6fd8
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/iceberg/data/IcebergGenericReader.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.TableScan;
+
+/**
+ * GenericReader to read for records for iceberg. Extends from Iceberg {@link
GenericReader} to
+ * enable the {@link #open(FileScanTask)} method to be visible to Fluss.
+ */
+public class IcebergGenericReader extends GenericReader {
+
+ public IcebergGenericReader(TableScan scan, boolean reuseContainers) {
+ super(scan, reuseContainers);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteTest.java
new file mode 100644
index 000000000..264588bee
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.maintenance;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.tiering.IcebergCatalogProvider;
+import org.apache.fluss.lake.iceberg.tiering.writer.TaskWriterFactory;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.apache.fluss.utils.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test to verify compaction via {@link IcebergRewriteDataFiles}. */
+class IcebergRewriteTest {
+
+ private @TempDir File tempWarehouseDir;
+ private Catalog icebergCatalog;
+
+ @BeforeEach
+ void setUp() {
+ Configuration configuration = new Configuration();
+ configuration.setString("warehouse", "file://" + tempWarehouseDir);
+ configuration.setString("type", "hadoop");
+ configuration.setString("name", "test");
+ IcebergCatalogProvider provider = new
IcebergCatalogProvider(configuration);
+ icebergCatalog = provider.get();
+ }
+
+ @Test
+ void testSingleBucketRewrite() throws Exception {
+ TablePath tablePath = TablePath.of("iceberg", "compact_table");
+ createTable(tablePath);
+ Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
+ int bucket = 0;
+ appendTinyFilesWithRowsAndBucket(icebergTable, 2, 3, 1000, bucket);
+ icebergTable.refresh();
+
+ int filesBefore = countDataFiles(icebergTable);
+ // We expect exactly 2 file for this no-op compaction scenario
+ // since rewrite only happen when small files >= 3
+ assertThat(filesBefore).isEqualTo(2);
+
+ IcebergRewriteDataFiles icebergRewriteDataFiles =
+ createIcebergRewriteDataFiles(icebergTable, bucket);
+
+ // do rewrite, rewrite result should be null since no rewrite should
happen
+ RewriteDataFileResult rewriteDataFileResult =
icebergRewriteDataFiles.execute();
+ assertThat(rewriteDataFileResult).isNull();
+
+ // append some files again, now, should rewrite
+ appendTinyFilesWithRowsAndBucket(icebergTable, 2, 3, 2000, bucket);
+
+ long rowsBefore = countRows(icebergTable);
+
+ icebergRewriteDataFiles = createIcebergRewriteDataFiles(icebergTable,
bucket);
+ rewriteDataFileResult = icebergRewriteDataFiles.execute();
+
+ // verify the rewrite result
+ assertThat(rewriteDataFileResult).isNotNull();
+ assertThat(rewriteDataFileResult.deletedDataFiles()).hasSize(4);
+ assertThat(rewriteDataFileResult.addedDataFiles()).hasSize(1);
+
+ // commit
+ commitRewrite(icebergTable, rewriteDataFileResult);
+ icebergTable.refresh();
+ // only one file now
+ assertThat(countDataFiles(icebergTable)).isEqualTo(1);
+
+ // try compact again, should do nothing
+ icebergRewriteDataFiles = createIcebergRewriteDataFiles(icebergTable,
bucket);
+ rewriteDataFileResult = icebergRewriteDataFiles.execute();
+ assertThat(rewriteDataFileResult).isNull();
+
+ // compact shouldn't change row counts
+ long rowsAfter = countRows(icebergTable);
+ assertThat(rowsAfter).isEqualTo(rowsBefore);
+ }
+
+ @Test
+ void testMultipleBucketRewrite() throws Exception {
+ TablePath tablePath = TablePath.of("iceberg", "rewrite_bucket_scoped");
+ createTable(tablePath);
+
+ Table table = icebergCatalog.loadTable(toIceberg(tablePath));
+ // Seed bucket 0: 3 tiny files, bucket 1: 3 tiny files
+ appendTinyFilesWithRowsAndBucket(table, 3, 1, 4000, 0);
+ appendTinyFilesWithRowsAndBucket(table, 3, 1, 5000, 1);
+ table.refresh();
+
+ int filesBeforeBucket0 = countFilesForBucket(table, 0);
+ int filesBeforeBucket1 = countFilesForBucket(table, 1);
+ assertThat(filesBeforeBucket0).isEqualTo(3);
+ assertThat(filesBeforeBucket1).isEqualTo(3);
+
+ // rewrite only bucket 0
+ RewriteDataFileResult rewriteDataFileResult =
+ createIcebergRewriteDataFiles(table, 0).execute();
+ assertThat(rewriteDataFileResult).isNotNull();
+ commitRewrite(table, rewriteDataFileResult);
+ table.refresh();
+
+ int filesAfterBucket0 = countFilesForBucket(table, 0);
+ int filesAfterBucket1 = countFilesForBucket(table, 1);
+ assertThat(filesAfterBucket0).isEqualTo(1);
+ assertThat(filesAfterBucket1).isEqualTo(3);
+
+ // rewrite only bucket 1
+ RewriteDataFileResult res1 = createIcebergRewriteDataFiles(table,
1).execute();
+ assertThat(res1).isNotNull();
+ commitRewrite(table, res1);
+ table.refresh();
+ filesAfterBucket1 = countFilesForBucket(table, 1);
+ assertThat(filesAfterBucket1).isEqualTo(1);
+ }
+
+ private IcebergRewriteDataFiles createIcebergRewriteDataFiles(Table table,
int bucket) {
+ table.refresh();
+ return new IcebergRewriteDataFiles(table, null, new TableBucket(0,
bucket));
+ }
+
+ private void commitRewrite(Table table, RewriteDataFileResult
rewriteDataFileResult) {
+ table.refresh();
+ RewriteFiles rewriteFiles = table.newRewrite();
+
rewriteDataFileResult.deletedDataFiles().forEach(rewriteFiles::deleteFile);
+ rewriteDataFileResult.addedDataFiles().forEach(rewriteFiles::addFile);
+ rewriteFiles.commit();
+ }
+
+ private void createTable(TablePath tablePath) {
+ Namespace namespace = Namespace.of(tablePath.getDatabaseName());
+ SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog;
+ if (!ns.namespaceExists(namespace)) {
+ ns.createNamespace(namespace);
+ }
+
+ Schema schema =
+ new Schema(
+ Types.NestedField.optional(1, "c1",
Types.IntegerType.get()),
+ Types.NestedField.optional(2, "c2",
Types.StringType.get()),
+ Types.NestedField.optional(3, "c3",
Types.StringType.get()),
+ Types.NestedField.required(4, BUCKET_COLUMN_NAME,
Types.IntegerType.get()),
+ Types.NestedField.required(5, OFFSET_COLUMN_NAME,
Types.LongType.get()),
+ Types.NestedField.required(
+ 6, TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone()));
+
+ PartitionSpec partitionSpec =
+
PartitionSpec.builderFor(schema).identity(BUCKET_COLUMN_NAME).build();
+ TableIdentifier tableId =
+ TableIdentifier.of(tablePath.getDatabaseName(),
tablePath.getTableName());
+ icebergCatalog.createTable(tableId, schema, partitionSpec);
+ }
+
+ private static int countDataFiles(Table table) throws IOException {
+ int count = 0;
+ try (CloseableIterable<FileScanTask> tasks =
table.newScan().planFiles()) {
+ for (FileScanTask ignored : tasks) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private static long countRows(Table table) {
+ long cnt = 0L;
+ try (CloseableIterable<Record> it =
IcebergGenerics.read(table).build()) {
+ for (Record ignored : it) {
+ cnt++;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return cnt;
+ }
+
+ private static int countFilesForBucket(Table table, int bucket) throws
IOException {
+ int count = 0;
+ try (CloseableIterable<FileScanTask> tasks =
+ table.newScan().filter(Expressions.equal(BUCKET_COLUMN_NAME,
bucket)).planFiles()) {
+ for (FileScanTask ignored : tasks) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private static void appendTinyFilesWithRowsAndBucket(
+ Table table, int files, int rowsPerFile, int baseOffset, int
bucket) throws Exception {
+ List<DataFile> toAppend = new ArrayList<>(files);
+ for (int i = 0; i < files; i++) {
+ toAppend.add(
+ writeTinyDataFile(table, rowsPerFile, baseOffset + (i *
rowsPerFile), bucket));
+ }
+ AppendFiles append = table.newAppend();
+ for (DataFile f : toAppend) {
+ append.appendFile(f);
+ }
+ append.commit();
+ }
+
+ private static DataFile writeTinyDataFile(Table table, int rows, int
startOffset, int bucket)
+ throws Exception {
+ try (TaskWriter<Record> taskWriter =
+ TaskWriterFactory.createTaskWriter(table, null, bucket)) {
+ for (int i = 0; i < rows; i++) {
+ Record r =
org.apache.iceberg.data.GenericRecord.create(table.schema());
+ r.setField("c1", i);
+ r.setField("c2", "v_" + i);
+ r.setField("c3", "g");
+ r.setField(BUCKET_COLUMN_NAME, bucket);
+ r.setField(OFFSET_COLUMN_NAME, (long) (startOffset + i));
+ r.setField(
+ TIMESTAMP_COLUMN_NAME,
+
java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC));
+ taskWriter.write(r);
+ }
+ DataFile[] dataFiles = taskWriter.dataFiles();
+ checkState(dataFiles.length == 1);
+ return dataFiles[0];
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
new file mode 100644
index 000000000..207714305
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.tiering;
+
+import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** UT for {@link IcebergWriteResultSerializer}. */
+class IcebergWriteResultSerializerTest {
+
+ private IcebergWriteResultSerializer serializer;
+ private Schema schema;
+ private PartitionSpec spec;
+ private GenericRecord partitionData;
+
+ @BeforeEach
+ void setUp() {
+ serializer = new IcebergWriteResultSerializer();
+ schema =
+ new Schema(
+ Types.NestedField.required(1, "id",
Types.IntegerType.get()),
+ Types.NestedField.optional(2, "category",
Types.StringType.get()));
+ spec = PartitionSpec.builderFor(schema).identity("category").build();
+ partitionData = GenericRecord.create(spec.partitionType());
+ partitionData.setField("category", "A");
+ }
+
+ @Test
+ void testSerializeAndDeserializeWithAllFiles() throws IOException {
+ // 1. Arrange: Create a complex write result
+ DataFile dataFile = createDataFile("/data/file1.parquet", 100L);
+ DeleteFile deleteFile = createDeleteFile("/data/deletes1.parquet",
10L);
+
+ // no rewrite result
+ WriteResult writeResult =
+
WriteResult.builder().addDataFiles(dataFile).addDeleteFiles(deleteFile).build();
+
+ IcebergWriteResult originalResult = new
IcebergWriteResult(writeResult, null);
+
+ // 2. Act: Serialize and then deserialize the object
+ byte[] serializedData = serializer.serialize(originalResult);
+ IcebergWriteResult deserializedResult =
+ serializer.deserialize(serializer.getVersion(),
serializedData);
+
assertThat(deserializedResult.toString()).isEqualTo(originalResult.toString());
+
+ // with rewrite result
+ RewriteDataFileResult rewriteDataFileResult =
+ new RewriteDataFileResult(
+ Collections.singletonList(dataFile),
Collections.singletonList(dataFile));
+ originalResult = new IcebergWriteResult(writeResult,
rewriteDataFileResult);
+ serializedData = serializer.serialize(originalResult);
+ deserializedResult = serializer.deserialize(serializer.getVersion(),
serializedData);
+
assertThat(deserializedResult.toString()).isEqualTo(originalResult.toString());
+ }
+
+ private DeleteFile createDeleteFile(String path, long recordCount) {
+ return FileMetadata.deleteFileBuilder(spec)
+ .withPath(path)
+ .withFileSizeInBytes(1024)
+ .withPartition(partitionData)
+ .withRecordCount(recordCount)
+ .withFormat(FileFormat.PARQUET)
+ .ofPositionDeletes()
+ .build();
+ }
+
+ private DataFile createDataFile(String path, long recordCount) {
+ return DataFiles.builder(spec)
+ .withPath(path)
+ .withFileSizeInBytes(4096)
+ .withPartition(partitionData)
+ .withRecordCount(recordCount)
+ .withFormat(FileFormat.PARQUET)
+ .build();
+ }
+}