This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new a7bf0077a [lake/iceberg] implement Iceberg log table compaction IT 
(#1651)
a7bf0077a is described below

commit a7bf0077ab42cdaee96b85c6b105fc743b7f2239
Author: xx789 <[email protected]>
AuthorDate: Mon Sep 8 14:10:23 2025 +0800

    [lake/iceberg] implement Iceberg log table compaction IT (#1651)
    
    ---------
    
    Co-authored-by: maxcwang <[email protected]>
---
 .../maintenance/IcebergRewriteDataFiles.java       |  1 +
 .../lake/iceberg/tiering/IcebergLakeWriter.java    | 18 ++---
 .../lake/iceberg/utils/IcebergConversions.java     | 13 ++--
 .../iceberg/maintenance/IcebergRewriteITCase.java  | 90 ++++++++++++++++++++++
 .../testutils/FlinkIcebergTieringTestBase.java     | 49 +++++++++---
 5 files changed, 144 insertions(+), 27 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
index a74507655..742a808a3 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
@@ -61,6 +61,7 @@ public class IcebergRewriteDataFiles {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IcebergRewriteDataFiles.class);
 
+    // TODO: make compaction strategy configurable
     private static final int MIN_FILES_TO_COMPACT = 3;
 
     private final Table table;
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
index da9d78f71..227112eae 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
@@ -56,12 +56,9 @@ public class IcebergLakeWriter implements 
LakeWriter<IcebergWriteResult> {
 
     protected static final Logger LOG = 
LoggerFactory.getLogger(IcebergLakeWriter.class);
 
-    private static final String AUTO_MAINTENANCE_KEY = 
"table.datalake.auto-maintenance";
-
     private final Catalog icebergCatalog;
     private final Table icebergTable;
     private final RecordWriter recordWriter;
-    private final boolean autoMaintenanceEnabled;
 
     @Nullable private final ExecutorService compactionExecutor;
     @Nullable private CompletableFuture<RewriteDataFileResult> 
compactionFuture;
@@ -72,15 +69,10 @@ public class IcebergLakeWriter implements 
LakeWriter<IcebergWriteResult> {
         this.icebergCatalog = icebergCatalogProvider.get();
         this.icebergTable = getTable(writerInitContext.tablePath());
 
-        // Check auto-maintenance from table properties
-        this.autoMaintenanceEnabled =
-                Boolean.parseBoolean(
-                        
icebergTable.properties().getOrDefault(AUTO_MAINTENANCE_KEY, "false"));
-
         // Create a record writer
         this.recordWriter = createRecordWriter(writerInitContext);
 
-        if (autoMaintenanceEnabled) {
+        if 
(writerInitContext.tableInfo().getTableConfig().isDataLakeAutoCompaction()) {
             this.compactionExecutor =
                     Executors.newSingleThreadExecutor(
                             new ExecutorThreadFactory(
@@ -137,9 +129,11 @@ public class IcebergLakeWriter implements 
LakeWriter<IcebergWriteResult> {
                 compactionFuture.cancel(true);
             }
 
-            if (compactionExecutor != null
-                    && !compactionExecutor.awaitTermination(30, 
TimeUnit.SECONDS)) {
-                LOG.warn("Fail to close compactionExecutor.");
+            if (compactionExecutor != null) {
+                compactionExecutor.shutdown();
+                if (!compactionExecutor.awaitTermination(30, 
TimeUnit.SECONDS)) {
+                    LOG.warn("Fail to close compactionExecutor.");
+                }
             }
 
             if (recordWriter != null) {
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
index af20ff594..f12b505b1 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
 import java.util.List;
 
 import static 
org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
+import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 
 /** Utility class for static conversions between Fluss and Iceberg types. */
 public class IcebergConversions {
@@ -70,13 +71,15 @@ public class IcebergConversions {
                         Expressions.and(
                                 expression,
                                 Expressions.equal(
-                                        
partitionFields.get(partitionIndex++).name(), partition));
+                                        table.schema()
+                                                .findColumnName(
+                                                        partitionFields
+                                                                
.get(partitionIndex++)
+                                                                .sourceId()),
+                                        partition));
             }
         }
-        expression =
-                Expressions.and(
-                        expression,
-                        
Expressions.equal(partitionFields.get(partitionIndex).name(), bucket));
+        expression = Expressions.and(expression, 
Expressions.equal(BUCKET_COLUMN_NAME, bucket));
         return expression;
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
new file mode 100644
index 000000000..a1a28b2d0
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.maintenance;
+
+import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+
+/** Integration test for Iceberg compaction. */
+class IcebergRewriteITCase extends FlinkIcebergTieringTestBase {
+    protected static final String DEFAULT_DB = "fluss";
+
+    private static StreamExecutionEnvironment execEnv;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkIcebergTieringTestBase.beforeAll();
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setParallelism(2);
+        execEnv.enableCheckpointing(1000);
+    }
+
+    @Test
+    void testLogTableCompaction() throws Exception {
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            TablePath t1 = TablePath.of(DEFAULT_DB, "log_table");
+            long t1Id = createLogTable(t1, true);
+            TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+            int i = 0;
+            List<InternalRow> flussRows = new ArrayList<>();
+            flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+
+            flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+
+            flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+            checkFileCountInIcebergTable(t1, 3);
+
+            // Write should trigger compaction now since the current data file 
count is greater or
+            // equal MIN_FILES_TO_COMPACT
+            flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+            // Should only have two files now, one file it for newly written, 
one file is for target
+            // compacted file
+            checkFileCountInIcebergTable(t1, 2);
+
+            // check data in iceberg to make sure compaction won't lose data 
or duplicate data
+            checkDataInIcebergAppendOnlyTable(t1, flussRows, 0);
+        } finally {
+            jobClient.cancel().get();
+        }
+    }
+
+    private List<InternalRow> writeLogTableRecords(
+            TablePath tablePath, TableBucket tableBucket, long 
expectedLogEndOffset)
+            throws Exception {
+        List<InternalRow> rows = Arrays.asList(row(1, "v1"));
+        writeRows(tablePath, rows, true);
+        assertReplicaStatus(tableBucket, expectedLogEndOffset);
+        return rows;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index 5eafb47e8..fdf5b02c5 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.catalog.Catalog;
@@ -52,6 +53,7 @@ import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.parquet.GenericParquetReaders;
 import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.parquet.Parquet;
 import org.junit.jupiter.api.AfterAll;
@@ -60,6 +62,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.file.Files;
@@ -185,18 +188,25 @@ public class FlinkIcebergTieringTestBase {
     }
 
     protected long createPkTable(TablePath tablePath) throws Exception {
-        return createPkTable(tablePath, 1);
+        return createPkTable(tablePath, false);
+    }
+
+    protected long createPkTable(TablePath tablePath, boolean 
enableAutoCompaction)
+            throws Exception {
+        return createPkTable(tablePath, 1, enableAutoCompaction);
     }
 
     protected long createLogTable(TablePath tablePath) throws Exception {
-        return createLogTable(tablePath, 1);
+        return createLogTable(tablePath, false);
     }
 
-    protected long createLogTable(TablePath tablePath, int bucketNum) throws 
Exception {
-        return createLogTable(tablePath, bucketNum, false);
+    protected long createLogTable(TablePath tablePath, boolean 
enableAutoCompaction)
+            throws Exception {
+        return createLogTable(tablePath, 1, false, enableAutoCompaction);
     }
 
-    protected long createLogTable(TablePath tablePath, int bucketNum, boolean 
isPartitioned)
+    protected long createLogTable(
+            TablePath tablePath, int bucketNum, boolean isPartitioned, boolean 
enableAutoCompaction)
             throws Exception {
         Schema.Builder schemaBuilder =
                 Schema.newBuilder().column("a", DataTypes.INT()).column("b", 
DataTypes.STRING());
@@ -214,12 +224,16 @@ public class FlinkIcebergTieringTestBase {
             tableBuilder.property(
                     ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, 
AutoPartitionTimeUnit.YEAR);
         }
+        if (enableAutoCompaction) {
+            
tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), 
"true");
+        }
         tableBuilder.schema(schemaBuilder.build());
         return createTable(tablePath, tableBuilder.build());
     }
 
-    protected long createPkTable(TablePath tablePath, int bucketNum) throws 
Exception {
-        TableDescriptor table1Descriptor =
+    protected long createPkTable(TablePath tablePath, int bucketNum, boolean 
enableAutoCompaction)
+            throws Exception {
+        TableDescriptor.Builder pkTableBuilder =
                 TableDescriptor.builder()
                         .schema(
                                 Schema.newBuilder()
@@ -246,9 +260,12 @@ public class FlinkIcebergTieringTestBase {
                                         .build())
                         .distributedBy(bucketNum)
                         .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
-                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500))
-                        .build();
-        return createTable(tablePath, table1Descriptor);
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+
+        if (enableAutoCompaction) {
+            
pkTableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), 
"true");
+        }
+        return createTable(tablePath, pkTableBuilder.build());
     }
 
     protected long createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
@@ -383,6 +400,18 @@ public class FlinkIcebergTieringTestBase {
         }
     }
 
+    protected void checkFileCountInIcebergTable(TablePath tablePath, int 
expectedFileCount)
+            throws IOException {
+        org.apache.iceberg.Table table = 
icebergCatalog.loadTable(toIceberg(tablePath));
+        int count = 0;
+        try (CloseableIterable<FileScanTask> tasks = 
table.newScan().planFiles()) {
+            for (FileScanTask ignored : tasks) {
+                count++;
+            }
+        }
+        assertThat(count).isEqualTo(expectedFileCount);
+    }
+
     protected void checkDataInIcebergAppendOnlyPartitionedTable(
             TablePath tablePath,
             Map<String, String> partitionSpec,

Reply via email to