This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 683ac54636 [core] Simplify force rewrite files in Compact Task (#5751)
683ac54636 is described below

commit 683ac546362ed00e98adda50d16e25e4086d4d04
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 16 10:16:53 2025 +0800

    [core] Simplify force rewrite files in Compact Task (#5751)
---
 .../shortcodes/generated/core_configuration.html   |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  8 +--
 .../append/BucketedAppendCompactManager.java       | 20 +++----
 .../org/apache/paimon/compact/CompactUnit.java     | 43 ++++++++------
 .../paimon/mergetree/compact/CompactStrategy.java  | 35 ++++-------
 .../mergetree/compact/FileRewriteCompactTask.java  | 67 ++++++++++++++++++++++
 .../mergetree/compact/MergeTreeCompactManager.java | 45 +++++++++------
 .../mergetree/compact/MergeTreeCompactTask.java    | 59 ++++++-------------
 .../operation/BucketedAppendFileStoreWrite.java    |  2 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |  2 +-
 .../flink/procedure/CompactProcedureITCase.java    |  4 +-
 11 files changed, 167 insertions(+), 120 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6bee1104a1..d7b21f18f7 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -183,7 +183,7 @@ under the License.
             <td>Ratio of the deleted rows in a data file to be forced 
compacted for append-only table.</td>
         </tr>
         <tr>
-            <td><h5>compaction.force-compact-all-files</h5></td>
+            <td><h5>compaction.force-rewrite-all-files</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
             <td>Whether to force pick all files for a full compaction. Usually 
seen in a compaction task to external paths.</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 47967be549..4c446dc4e5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -151,8 +151,8 @@ public class CoreOptions implements Serializable {
                                     + ExternalPathStrategy.SPECIFIC_FS
                                     + ", should be the prefix scheme of the 
external path, now supported are s3 and oss.");
 
-    public static final ConfigOption<Boolean> 
COMPACTION_FORCE_COMPACT_ALL_FILES =
-            key("compaction.force-compact-all-files")
+    public static final ConfigOption<Boolean> 
COMPACTION_FORCE_REWRITE_ALL_FILES =
+            key("compaction.force-rewrite-all-files")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
@@ -2473,8 +2473,8 @@ public class CoreOptions implements Serializable {
         return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
     }
 
-    public Boolean forceCompactAllFiles() {
-        return options.get(COMPACTION_FORCE_COMPACT_ALL_FILES);
+    public Boolean forceRewriteAllFiles() {
+        return options.get(COMPACTION_FORCE_REWRITE_ALL_FILES);
     }
 
     public String partitionTimestampFormatter() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index 520fd8edb3..a27a40e8a9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -60,7 +60,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
     private final PriorityQueue<DataFileMeta> toCompact;
     private final int minFileNum;
     private final long targetFileSize;
-    private final boolean forceCompactAllFiles;
+    private final boolean forceRewriteAllFiles;
     private final CompactRewriter rewriter;
 
     private List<DataFileMeta> compacting;
@@ -73,7 +73,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
             @Nullable DeletionVectorsMaintainer dvMaintainer,
             int minFileNum,
             long targetFileSize,
-            boolean forceCompactAllFiles,
+            boolean forceRewriteAllFiles,
             CompactRewriter rewriter,
             @Nullable CompactionMetrics.Reporter metricsReporter) {
         this.executor = executor;
@@ -82,7 +82,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
         this.toCompact.addAll(restored);
         this.minFileNum = minFileNum;
         this.targetFileSize = targetFileSize;
-        this.forceCompactAllFiles = forceCompactAllFiles;
+        this.forceRewriteAllFiles = forceRewriteAllFiles;
         this.rewriter = rewriter;
         this.metricsReporter = metricsReporter;
     }
@@ -102,7 +102,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
                 "A compaction task is still running while the user "
                         + "forces a new compaction. This is unexpected.");
         // if all files are force picked or deletion vector enables, always 
trigger compaction.
-        if (!forceCompactAllFiles
+        if (!forceRewriteAllFiles
                 && (toCompact.isEmpty()
                         || (dvMaintainer == null && toCompact.size() < 
FULL_COMPACT_MIN_FILE))) {
             return;
@@ -118,7 +118,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
                                 dvMaintainer,
                                 toCompact,
                                 targetFileSize,
-                                forceCompactAllFiles,
+                                forceRewriteAllFiles,
                                 rewriter,
                                 metricsReporter));
         recordCompactionsQueuedRequest();
@@ -243,28 +243,28 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
         private final DeletionVectorsMaintainer dvMaintainer;
         private final LinkedList<DataFileMeta> toCompact;
         private final long targetFileSize;
-        private final boolean forceCompactAllFiles;
+        private final boolean forceRewriteAllFiles;
         private final CompactRewriter rewriter;
 
         public FullCompactTask(
                 DeletionVectorsMaintainer dvMaintainer,
                 Collection<DataFileMeta> inputs,
                 long targetFileSize,
-                boolean forceCompactAllFiles,
+                boolean forceRewriteAllFiles,
                 CompactRewriter rewriter,
                 @Nullable CompactionMetrics.Reporter metricsReporter) {
             super(metricsReporter);
             this.dvMaintainer = dvMaintainer;
             this.toCompact = new LinkedList<>(inputs);
             this.targetFileSize = targetFileSize;
-            this.forceCompactAllFiles = forceCompactAllFiles;
+            this.forceRewriteAllFiles = forceRewriteAllFiles;
             this.rewriter = rewriter;
         }
 
         @Override
         protected CompactResult doCompact() throws Exception {
             // remove large files
-            while (!forceCompactAllFiles && !toCompact.isEmpty()) {
+            while (!forceRewriteAllFiles && !toCompact.isEmpty()) {
                 DataFileMeta file = toCompact.peekFirst();
                 // the data file with deletion file always need to be 
compacted.
                 if (file.fileSize() >= targetFileSize && 
!hasDeletionFile(file)) {
@@ -289,7 +289,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
                         small++;
                     }
                 }
-                if (forceCompactAllFiles
+                if (forceRewriteAllFiles
                         || (small > big && toCompact.size() >= 
FULL_COMPACT_MIN_FILE)) {
                     return compact(null, toCompact, rewriter);
                 } else {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java 
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java
index 9b0fdd26ad..ae367f9171 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java
@@ -25,31 +25,40 @@ import java.util.ArrayList;
 import java.util.List;
 
 /** A files unit for compaction. */
-public interface CompactUnit {
+public class CompactUnit {
 
-    int outputLevel();
+    private final int outputLevel;
+    private final List<DataFileMeta> files;
+    private final boolean fileRewrite;
 
-    List<DataFileMeta> files();
+    public CompactUnit(int outputLevel, List<DataFileMeta> files, boolean 
fileRewrite) {
+        this.outputLevel = outputLevel;
+        this.files = files;
+        this.fileRewrite = fileRewrite;
+    }
+
+    public int outputLevel() {
+        return outputLevel;
+    }
+
+    public List<DataFileMeta> files() {
+        return files;
+    }
+
+    public boolean fileRewrite() {
+        return fileRewrite;
+    }
 
-    static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun> 
runs) {
+    public static CompactUnit fromLevelRuns(int outputLevel, 
List<LevelSortedRun> runs) {
         List<DataFileMeta> files = new ArrayList<>();
         for (LevelSortedRun run : runs) {
             files.addAll(run.run().files());
         }
-        return fromFiles(outputLevel, files);
+        return fromFiles(outputLevel, files, false);
     }
 
-    static CompactUnit fromFiles(int outputLevel, List<DataFileMeta> files) {
-        return new CompactUnit() {
-            @Override
-            public int outputLevel() {
-                return outputLevel;
-            }
-
-            @Override
-            public List<DataFileMeta> files() {
-                return files;
-            }
-        };
+    public static CompactUnit fromFiles(
+            int outputLevel, List<DataFileMeta> files, boolean fileRewrite) {
+        return new CompactUnit(outputLevel, files, fileRewrite);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
index ef56196e0a..0ab0981963 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
@@ -32,7 +32,6 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /** Compact strategy to decide which files to select for compaction. */
 public interface CompactStrategy {
@@ -56,16 +55,19 @@ public interface CompactStrategy {
             List<LevelSortedRun> runs,
             @Nullable RecordLevelExpire recordLevelExpire,
             @Nullable DeletionVectorsMaintainer dvMaintainer,
-            boolean forceCompactAllFiles) {
+            boolean forceRewriteAllFiles) {
         int maxLevel = numLevels - 1;
         if (runs.isEmpty()) {
             // no sorted run, no need to compact
             return Optional.empty();
-        } else if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) {
+        }
+
+        // only max level files
+        if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) {
             List<DataFileMeta> filesToBeCompacted = new ArrayList<>();
 
             for (DataFileMeta file : runs.get(0).run().files()) {
-                if (forceCompactAllFiles) {
+                if (forceRewriteAllFiles) {
                     // add all files when force compacted
                     filesToBeCompacted.add(file);
                 } else if (recordLevelExpire != null && 
recordLevelExpire.isExpireFile(file)) {
@@ -78,27 +80,14 @@ public interface CompactStrategy {
                 }
             }
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(
-                        "Pick these files which have expired records or dv 
index for full compaction: {}",
-                        filesToBeCompacted.stream()
-                                .map(
-                                        file ->
-                                                String.format(
-                                                        "(%s, %d, %d)",
-                                                        file.fileName(),
-                                                        file.level(),
-                                                        file.fileSize()))
-                                .collect(Collectors.joining(", ")));
-            }
-
-            if (!filesToBeCompacted.isEmpty()) {
-                return Optional.of(CompactUnit.fromFiles(maxLevel, 
filesToBeCompacted));
-            } else {
+            if (filesToBeCompacted.isEmpty()) {
                 return Optional.empty();
             }
-        } else {
-            return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+
+            return Optional.of(CompactUnit.fromFiles(maxLevel, 
filesToBeCompacted, true));
         }
+
+        // full compaction
+        return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java
new file mode 100644
index 0000000000..620ea0748a
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.compact.CompactTask;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static java.util.Collections.singletonList;
+
+/** Compact task for file rewrite compaction. */
+public class FileRewriteCompactTask extends CompactTask {
+
+    private final CompactRewriter rewriter;
+    private final int outputLevel;
+    private final List<DataFileMeta> files;
+    private final boolean dropDelete;
+
+    public FileRewriteCompactTask(
+            CompactRewriter rewriter,
+            CompactUnit unit,
+            boolean dropDelete,
+            @Nullable CompactionMetrics.Reporter metricsReporter) {
+        super(metricsReporter);
+        this.rewriter = rewriter;
+        this.outputLevel = unit.outputLevel();
+        this.files = unit.files();
+        this.dropDelete = dropDelete;
+    }
+
+    @Override
+    protected CompactResult doCompact() throws Exception {
+        CompactResult result = new CompactResult();
+        for (DataFileMeta file : files) {
+            rewriteFile(file, result);
+        }
+        return result;
+    }
+
+    private void rewriteFile(DataFileMeta file, CompactResult toUpdate) throws 
Exception {
+        List<List<SortedRun>> candidate = 
singletonList(singletonList(SortedRun.fromSingle(file)));
+        toUpdate.merge(rewriter.rewrite(outputLevel, dropDelete, candidate));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 953a235391..025fd20156 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -23,6 +23,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactDeletionFile;
 import org.apache.paimon.compact.CompactFutureManager;
 import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.compact.CompactTask;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
@@ -65,7 +66,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
     @Nullable private final DeletionVectorsMaintainer dvMaintainer;
     private final boolean lazyGenDeletionFile;
     private final boolean needLookup;
-    private final boolean forceCompactAllFiles;
+    private final boolean forceRewriteAllFiles;
 
     @Nullable private final RecordLevelExpire recordLevelExpire;
 
@@ -82,7 +83,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
             boolean lazyGenDeletionFile,
             boolean needLookup,
             @Nullable RecordLevelExpire recordLevelExpire,
-            boolean forceCompactAllFiles) {
+            boolean forceRewriteAllFiles) {
         this.executor = executor;
         this.levels = levels;
         this.strategy = strategy;
@@ -95,7 +96,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
         this.lazyGenDeletionFile = lazyGenDeletionFile;
         this.recordLevelExpire = recordLevelExpire;
         this.needLookup = needLookup;
-        this.forceCompactAllFiles = forceCompactAllFiles;
+        this.forceRewriteAllFiles = forceRewriteAllFiles;
 
         MetricUtils.safeCall(this::reportMetrics, LOG);
     }
@@ -142,7 +143,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
                             runs,
                             recordLevelExpire,
                             dvMaintainer,
-                            forceCompactAllFiles);
+                            forceRewriteAllFiles);
         } else {
             if (taskFuture != null) {
                 return;
@@ -152,7 +153,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
             }
             optionalUnit =
                     strategy.pick(levels.numberOfLevels(), runs)
-                            .filter(unit -> unit.files().size() > 0)
+                            .filter(unit -> !unit.files().isEmpty())
                             .filter(
                                     unit ->
                                             unit.files().size() > 1
@@ -206,22 +207,28 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
                             : () -> 
CompactDeletionFile.generateFiles(dvMaintainer);
         }
 
-        MergeTreeCompactTask task =
-                new MergeTreeCompactTask(
-                        keyComparator,
-                        compactionFileSize,
-                        rewriter,
-                        unit,
-                        dropDelete,
-                        levels.maxLevel(),
-                        metricsReporter,
-                        compactDfSupplier,
-                        dvMaintainer,
-                        recordLevelExpire,
-                        forceCompactAllFiles);
+        CompactTask task;
+        if (unit.fileRewrite()) {
+            task = new FileRewriteCompactTask(rewriter, unit, dropDelete, 
metricsReporter);
+        } else {
+            task =
+                    new MergeTreeCompactTask(
+                            keyComparator,
+                            compactionFileSize,
+                            rewriter,
+                            unit,
+                            dropDelete,
+                            levels.maxLevel(),
+                            metricsReporter,
+                            compactDfSupplier,
+                            recordLevelExpire,
+                            forceRewriteAllFiles);
+        }
+
         if (LOG.isDebugEnabled()) {
             LOG.debug(
-                    "Pick these files (name, level, size) for compaction: {}",
+                    "Pick these files (name, level, size) for {} compaction: 
{}",
+                    task.getClass().getSimpleName(),
                     unit.files().stream()
                             .map(
                                     file ->
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
index 4aa10e9b79..667a965c62 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
@@ -23,7 +23,6 @@ import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.compact.CompactTask;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.RecordLevelExpire;
 import org.apache.paimon.mergetree.SortedRun;
@@ -45,19 +44,15 @@ public class MergeTreeCompactTask extends CompactTask {
     private final CompactRewriter rewriter;
     private final int outputLevel;
     private final Supplier<CompactDeletionFile> compactDfSupplier;
-
     private final List<List<SortedRun>> partitioned;
-
     private final boolean dropDelete;
     private final int maxLevel;
+    @Nullable private final RecordLevelExpire recordLevelExpire;
+    private final boolean forceRewriteAllFiles;
 
     // metric
     private int upgradeFilesNum;
 
-    @Nullable private final RecordLevelExpire recordLevelExpire;
-    private final boolean forceCompactAllFiles;
-    @Nullable private final DeletionVectorsMaintainer dvMaintainer;
-
     public MergeTreeCompactTask(
             Comparator<InternalRow> keyComparator,
             long minFileSize,
@@ -67,20 +62,18 @@ public class MergeTreeCompactTask extends CompactTask {
             int maxLevel,
             @Nullable CompactionMetrics.Reporter metricsReporter,
             Supplier<CompactDeletionFile> compactDfSupplier,
-            @Nullable DeletionVectorsMaintainer dvMaintainer,
             @Nullable RecordLevelExpire recordLevelExpire,
-            boolean forceCompactAllFiles) {
+            boolean forceRewriteAllFiles) {
         super(metricsReporter);
         this.minFileSize = minFileSize;
         this.rewriter = rewriter;
         this.outputLevel = unit.outputLevel();
         this.compactDfSupplier = compactDfSupplier;
-        this.dvMaintainer = dvMaintainer;
         this.partitioned = new IntervalPartition(unit.files(), 
keyComparator).partition();
         this.dropDelete = dropDelete;
         this.maxLevel = maxLevel;
         this.recordLevelExpire = recordLevelExpire;
-        this.forceCompactAllFiles = forceCompactAllFiles;
+        this.forceRewriteAllFiles = forceRewriteAllFiles;
 
         this.upgradeFilesNum = 0;
     }
@@ -128,34 +121,19 @@ public class MergeTreeCompactTask extends CompactTask {
     }
 
     private void upgrade(DataFileMeta file, CompactResult toUpdate) throws 
Exception {
-        if (file.level() == outputLevel) {
-            if (forceCompactAllFiles
-                    || isContainExpiredRecords(file)
-                    || (dvMaintainer != null
-                            && 
dvMaintainer.deletionVectorOf(file.fileName()).isPresent())) {
-                /*
-                 * 1. if files are force picked, we need to rewrite all files.
-                 * 2. if the large file in maxLevel has expired records, we 
need to rewrite it.
-                 * 3. if the large file in maxLevel has corresponding deletion 
vector, we need to rewrite it.
-                 */
-                rewriteFile(file, toUpdate);
-            }
+        if ((outputLevel == maxLevel && containsDeleteRecords(file))
+                || forceRewriteAllFiles
+                || containsExpiredRecords(file)) {
+            List<List<SortedRun>> candidate = new ArrayList<>();
+            candidate.add(singletonList(SortedRun.fromSingle(file)));
+            rewriteImpl(candidate, toUpdate);
             return;
         }
 
-        if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 
0).orElse(false)) {
-            if (forceCompactAllFiles || isContainExpiredRecords(file)) {
-                // if all files are force picked, or the file which could be 
directly upgraded has
-                // expired records, we need to rewrite it
-                rewriteFile(file, toUpdate);
-            } else {
-                CompactResult upgradeResult = rewriter.upgrade(outputLevel, 
file);
-                toUpdate.merge(upgradeResult);
-                upgradeFilesNum++;
-            }
-        } else {
-            // files with delete records should not be upgraded directly to 
max level
-            rewriteFile(file, toUpdate);
+        if (file.level() != outputLevel) {
+            CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
+            toUpdate.merge(upgradeResult);
+            upgradeFilesNum++;
         }
     }
 
@@ -185,14 +163,11 @@ public class MergeTreeCompactTask extends CompactTask {
         candidate.clear();
     }
 
-    private void rewriteFile(DataFileMeta file, CompactResult toUpdate) throws 
Exception {
-        List<List<SortedRun>> candidate = new ArrayList<>();
-        candidate.add(new ArrayList<>());
-        candidate.get(0).add(SortedRun.fromSingle(file));
-        rewriteImpl(candidate, toUpdate);
+    private boolean containsDeleteRecords(DataFileMeta file) {
+        return file.deleteRowCount().map(d -> d > 0).orElse(true);
     }
 
-    private boolean isContainExpiredRecords(DataFileMeta file) {
+    private boolean containsExpiredRecords(DataFileMeta file) {
         return recordLevelExpire != null && 
recordLevelExpire.isExpireFile(file);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
index d8f2d9211b..03320ad6e1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
@@ -91,7 +91,7 @@ public class BucketedAppendFileStoreWrite extends 
BaseAppendFileStoreWrite {
                     dvMaintainer,
                     options.compactionMinFileNum(),
                     options.targetFileSize(false),
-                    options.forceCompactAllFiles(),
+                    options.forceRewriteAllFiles(),
                     files -> compactRewrite(partition, bucket, dvFactory, 
files),
                     compactionMetrics == null
                             ? null
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 7c0b36701f..30fce29821 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -291,7 +291,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                     options.prepareCommitWaitCompaction(),
                     options.needLookup(),
                     recordLevelExpire,
-                    options.forceCompactAllFiles());
+                    options.forceRewriteAllFiles());
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
index d58c09d3a8..02001ff1af 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
@@ -222,7 +222,7 @@ public class CompactProcedureITCase extends 
CatalogITCaseBase {
         tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
         sql(
                 "CALL sys.compact(`table` => 'default.Tpk',"
-                        + " options => 
'compaction.force-compact-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')",
+                        + " options => 
'compaction.force-rewrite-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')",
                 tmpPath);
         List<DataSplit> splits = 
pkTable.newSnapshotReader().read().dataSplits();
         for (DataSplit split : splits) {
@@ -253,7 +253,7 @@ public class CompactProcedureITCase extends 
CatalogITCaseBase {
         tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
         sql(
                 "CALL sys.compact(`table` => 'default.Tap',"
-                        + " options => 
'compaction.force-compact-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')",
+                        + " options => 
'compaction.force-rewrite-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')",
                 tmpPath);
         splits = apTable.newSnapshotReader().read().dataSplits();
         for (DataSplit split : splits) {

Reply via email to