This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new a7bf0077a [lake/iceberg] implement Iceberg log table compaction IT
(#1651)
a7bf0077a is described below
commit a7bf0077ab42cdaee96b85c6b105fc743b7f2239
Author: xx789 <[email protected]>
AuthorDate: Mon Sep 8 14:10:23 2025 +0800
[lake/iceberg] implement Iceberg log table compaction IT (#1651)
---------
Co-authored-by: maxcwang <[email protected]>
---
.../maintenance/IcebergRewriteDataFiles.java | 1 +
.../lake/iceberg/tiering/IcebergLakeWriter.java | 18 ++---
.../lake/iceberg/utils/IcebergConversions.java | 13 ++--
.../iceberg/maintenance/IcebergRewriteITCase.java | 90 ++++++++++++++++++++++
.../testutils/FlinkIcebergTieringTestBase.java | 49 +++++++++---
5 files changed, 144 insertions(+), 27 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
index a74507655..742a808a3 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
@@ -61,6 +61,7 @@ public class IcebergRewriteDataFiles {
private static final Logger LOG =
LoggerFactory.getLogger(IcebergRewriteDataFiles.class);
+ // TODO: make compaction strategy configurable
private static final int MIN_FILES_TO_COMPACT = 3;
private final Table table;
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
index da9d78f71..227112eae 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
@@ -56,12 +56,9 @@ public class IcebergLakeWriter implements
LakeWriter<IcebergWriteResult> {
protected static final Logger LOG =
LoggerFactory.getLogger(IcebergLakeWriter.class);
- private static final String AUTO_MAINTENANCE_KEY =
"table.datalake.auto-maintenance";
-
private final Catalog icebergCatalog;
private final Table icebergTable;
private final RecordWriter recordWriter;
- private final boolean autoMaintenanceEnabled;
@Nullable private final ExecutorService compactionExecutor;
@Nullable private CompletableFuture<RewriteDataFileResult>
compactionFuture;
@@ -72,15 +69,10 @@ public class IcebergLakeWriter implements
LakeWriter<IcebergWriteResult> {
this.icebergCatalog = icebergCatalogProvider.get();
this.icebergTable = getTable(writerInitContext.tablePath());
- // Check auto-maintenance from table properties
- this.autoMaintenanceEnabled =
- Boolean.parseBoolean(
-
icebergTable.properties().getOrDefault(AUTO_MAINTENANCE_KEY, "false"));
-
// Create a record writer
this.recordWriter = createRecordWriter(writerInitContext);
- if (autoMaintenanceEnabled) {
+ if
(writerInitContext.tableInfo().getTableConfig().isDataLakeAutoCompaction()) {
this.compactionExecutor =
Executors.newSingleThreadExecutor(
new ExecutorThreadFactory(
@@ -137,9 +129,11 @@ public class IcebergLakeWriter implements
LakeWriter<IcebergWriteResult> {
compactionFuture.cancel(true);
}
- if (compactionExecutor != null
- && !compactionExecutor.awaitTermination(30,
TimeUnit.SECONDS)) {
- LOG.warn("Fail to close compactionExecutor.");
+ if (compactionExecutor != null) {
+ compactionExecutor.shutdown();
+ if (!compactionExecutor.awaitTermination(30,
TimeUnit.SECONDS)) {
+ LOG.warn("Fail to close compactionExecutor.");
+ }
}
if (recordWriter != null) {
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
index af20ff594..f12b505b1 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
import java.util.List;
import static
org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
+import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
/** Utility class for static conversions between Fluss and Iceberg types. */
public class IcebergConversions {
@@ -70,13 +71,15 @@ public class IcebergConversions {
Expressions.and(
expression,
Expressions.equal(
-
partitionFields.get(partitionIndex++).name(), partition));
+ table.schema()
+ .findColumnName(
+ partitionFields
+
.get(partitionIndex++)
+ .sourceId()),
+ partition));
}
}
- expression =
- Expressions.and(
- expression,
-
Expressions.equal(partitionFields.get(partitionIndex).name(), bucket));
+ expression = Expressions.and(expression,
Expressions.equal(BUCKET_COLUMN_NAME, bucket));
return expression;
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
new file mode 100644
index 000000000..a1a28b2d0
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.maintenance;
+
+import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+
+/** Integration test for Iceberg compaction. */
+class IcebergRewriteITCase extends FlinkIcebergTieringTestBase {
+ protected static final String DEFAULT_DB = "fluss";
+
+ private static StreamExecutionEnvironment execEnv;
+
+ @BeforeAll
+ protected static void beforeAll() {
+ FlinkIcebergTieringTestBase.beforeAll();
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.setParallelism(2);
+ execEnv.enableCheckpointing(1000);
+ }
+
+ @Test
+ void testLogTableCompaction() throws Exception {
+ JobClient jobClient = buildTieringJob(execEnv);
+ try {
+ TablePath t1 = TablePath.of(DEFAULT_DB, "log_table");
+ long t1Id = createLogTable(t1, true);
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+ int i = 0;
+ List<InternalRow> flussRows = new ArrayList<>();
+ flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+
+ flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+
+ flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+ checkFileCountInIcebergTable(t1, 3);
+
+ // Write should trigger compaction now since the current data file
count is greater or
+ // equal MIN_FILES_TO_COMPACT
+ flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+ // Should only have two files now, one file it for newly written,
one file is for target
+ // compacted file
+ checkFileCountInIcebergTable(t1, 2);
+
+ // check data in iceberg to make sure compaction won't lose data
or duplicate data
+ checkDataInIcebergAppendOnlyTable(t1, flussRows, 0);
+ } finally {
+ jobClient.cancel().get();
+ }
+ }
+
+ private List<InternalRow> writeLogTableRecords(
+ TablePath tablePath, TableBucket tableBucket, long
expectedLogEndOffset)
+ throws Exception {
+ List<InternalRow> rows = Arrays.asList(row(1, "v1"));
+ writeRows(tablePath, rows, true);
+ assertReplicaStatus(tableBucket, expectedLogEndOffset);
+ return rows;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index 5eafb47e8..fdf5b02c5 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
@@ -52,6 +53,7 @@ import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.parquet.Parquet;
import org.junit.jupiter.api.AfterAll;
@@ -60,6 +62,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import java.io.Closeable;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
@@ -185,18 +188,25 @@ public class FlinkIcebergTieringTestBase {
}
protected long createPkTable(TablePath tablePath) throws Exception {
- return createPkTable(tablePath, 1);
+ return createPkTable(tablePath, false);
+ }
+
+ protected long createPkTable(TablePath tablePath, boolean
enableAutoCompaction)
+ throws Exception {
+ return createPkTable(tablePath, 1, enableAutoCompaction);
}
protected long createLogTable(TablePath tablePath) throws Exception {
- return createLogTable(tablePath, 1);
+ return createLogTable(tablePath, false);
}
- protected long createLogTable(TablePath tablePath, int bucketNum) throws
Exception {
- return createLogTable(tablePath, bucketNum, false);
+ protected long createLogTable(TablePath tablePath, boolean
enableAutoCompaction)
+ throws Exception {
+ return createLogTable(tablePath, 1, false, enableAutoCompaction);
}
- protected long createLogTable(TablePath tablePath, int bucketNum, boolean
isPartitioned)
+ protected long createLogTable(
+ TablePath tablePath, int bucketNum, boolean isPartitioned, boolean
enableAutoCompaction)
throws Exception {
Schema.Builder schemaBuilder =
Schema.newBuilder().column("a", DataTypes.INT()).column("b",
DataTypes.STRING());
@@ -214,12 +224,16 @@ public class FlinkIcebergTieringTestBase {
tableBuilder.property(
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR);
}
+ if (enableAutoCompaction) {
+
tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(),
"true");
+ }
tableBuilder.schema(schemaBuilder.build());
return createTable(tablePath, tableBuilder.build());
}
- protected long createPkTable(TablePath tablePath, int bucketNum) throws
Exception {
- TableDescriptor table1Descriptor =
+ protected long createPkTable(TablePath tablePath, int bucketNum, boolean
enableAutoCompaction)
+ throws Exception {
+ TableDescriptor.Builder pkTableBuilder =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
@@ -246,9 +260,12 @@ public class FlinkIcebergTieringTestBase {
.build())
.distributedBy(bucketNum)
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
- .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500))
- .build();
- return createTable(tablePath, table1Descriptor);
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+
+ if (enableAutoCompaction) {
+
pkTableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(),
"true");
+ }
+ return createTable(tablePath, pkTableBuilder.build());
}
protected long createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
@@ -383,6 +400,18 @@ public class FlinkIcebergTieringTestBase {
}
}
+ protected void checkFileCountInIcebergTable(TablePath tablePath, int
expectedFileCount)
+ throws IOException {
+ org.apache.iceberg.Table table =
icebergCatalog.loadTable(toIceberg(tablePath));
+ int count = 0;
+ try (CloseableIterable<FileScanTask> tasks =
table.newScan().planFiles()) {
+ for (FileScanTask ignored : tasks) {
+ count++;
+ }
+ }
+ assertThat(count).isEqualTo(expectedFileCount);
+ }
+
protected void checkDataInIcebergAppendOnlyPartitionedTable(
TablePath tablePath,
Map<String, String> partitionSpec,