This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.2
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit a44ae965784106f293320979c282c9c9661d74fb
Author: Jiao Mingye <[email protected]>
AuthorDate: Sun Jun 15 09:38:27 2025 +0800

    [core] Support dedicated full compact to external paths (#5674)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 11 ++++
 .../append/BucketedAppendCompactManager.java       | 19 ++++--
 .../paimon/mergetree/compact/CompactStrategy.java  |  8 ++-
 .../mergetree/compact/MergeTreeCompactManager.java | 14 ++++-
 .../mergetree/compact/MergeTreeCompactTask.java    | 19 +++---
 .../operation/BucketedAppendFileStoreWrite.java    |  1 +
 .../paimon/operation/KeyValueFileStoreWrite.java   |  3 +-
 .../apache/paimon/append/AppendOnlyWriterTest.java |  1 +
 .../append/BucketedAppendCompactManagerTest.java   |  1 +
 .../apache/paimon/append/FullCompactTaskTest.java  |  2 +-
 .../apache/paimon/format/FileFormatSuffixTest.java |  2 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |  6 +-
 .../compact/MergeTreeCompactManagerTest.java       |  9 ++-
 .../flink/procedure/CompactProcedureITCase.java    | 67 ++++++++++++++++++++++
 15 files changed, 144 insertions(+), 25 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 8cd61f61d6..6bee1104a1 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -182,6 +182,12 @@ under the License.
             <td>Double</td>
             <td>Ratio of the deleted rows in a data file to be forced 
compacted for append-only table.</td>
         </tr>
+        <tr>
+            <td><h5>compaction.force-compact-all-files</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to force pick all files for a full compaction. Usually 
seen in a compaction task to external paths.</td>
+        </tr>
         <tr>
             <td><h5>compaction.force-up-level-0</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 1df7949acc..47967be549 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -151,6 +151,13 @@ public class CoreOptions implements Serializable {
                                     + ExternalPathStrategy.SPECIFIC_FS
                                     + ", should be the prefix scheme of the 
external path, now supported are s3 and oss.");
 
+    public static final ConfigOption<Boolean> 
COMPACTION_FORCE_COMPACT_ALL_FILES =
+            key("compaction.force-compact-all-files")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to force pick all files for a full 
compaction. Usually seen in a compaction task to external paths.");
+
     @ExcludeFromDocumentation("Internal use only")
     public static final ConfigOption<String> PATH =
             key("path")
@@ -2466,6 +2473,10 @@ public class CoreOptions implements Serializable {
         return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
     }
 
+    public Boolean forceCompactAllFiles() {
+        return options.get(COMPACTION_FORCE_COMPACT_ALL_FILES);
+    }
+
     public String partitionTimestampFormatter() {
         return options.get(PARTITION_TIMESTAMP_FORMATTER);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index d5c47c3b88..520fd8edb3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -60,6 +60,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
     private final PriorityQueue<DataFileMeta> toCompact;
     private final int minFileNum;
     private final long targetFileSize;
+    private final boolean forceCompactAllFiles;
     private final CompactRewriter rewriter;
 
     private List<DataFileMeta> compacting;
@@ -72,6 +73,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
             @Nullable DeletionVectorsMaintainer dvMaintainer,
             int minFileNum,
             long targetFileSize,
+            boolean forceCompactAllFiles,
             CompactRewriter rewriter,
             @Nullable CompactionMetrics.Reporter metricsReporter) {
         this.executor = executor;
@@ -80,6 +82,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
         this.toCompact.addAll(restored);
         this.minFileNum = minFileNum;
         this.targetFileSize = targetFileSize;
+        this.forceCompactAllFiles = forceCompactAllFiles;
         this.rewriter = rewriter;
         this.metricsReporter = metricsReporter;
     }
@@ -98,9 +101,10 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
                 taskFuture == null,
                 "A compaction task is still running while the user "
                         + "forces a new compaction. This is unexpected.");
-        // if deletion vector enables, always trigger compaction.
-        if (toCompact.isEmpty()
-                || (dvMaintainer == null && toCompact.size() < 
FULL_COMPACT_MIN_FILE)) {
+        // if all files are force picked or deletion vector enables, always 
trigger compaction.
+        if (!forceCompactAllFiles
+                && (toCompact.isEmpty()
+                        || (dvMaintainer == null && toCompact.size() < 
FULL_COMPACT_MIN_FILE))) {
             return;
         }
 
@@ -114,6 +118,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
                                 dvMaintainer,
                                 toCompact,
                                 targetFileSize,
+                                forceCompactAllFiles,
                                 rewriter,
                                 metricsReporter));
         recordCompactionsQueuedRequest();
@@ -238,25 +243,28 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
         private final DeletionVectorsMaintainer dvMaintainer;
         private final LinkedList<DataFileMeta> toCompact;
         private final long targetFileSize;
+        private final boolean forceCompactAllFiles;
         private final CompactRewriter rewriter;
 
         public FullCompactTask(
                 DeletionVectorsMaintainer dvMaintainer,
                 Collection<DataFileMeta> inputs,
                 long targetFileSize,
+                boolean forceCompactAllFiles,
                 CompactRewriter rewriter,
                 @Nullable CompactionMetrics.Reporter metricsReporter) {
             super(metricsReporter);
             this.dvMaintainer = dvMaintainer;
             this.toCompact = new LinkedList<>(inputs);
             this.targetFileSize = targetFileSize;
+            this.forceCompactAllFiles = forceCompactAllFiles;
             this.rewriter = rewriter;
         }
 
         @Override
         protected CompactResult doCompact() throws Exception {
             // remove large files
-            while (!toCompact.isEmpty()) {
+            while (!forceCompactAllFiles && !toCompact.isEmpty()) {
                 DataFileMeta file = toCompact.peekFirst();
                 // the data file with deletion file always need to be 
compacted.
                 if (file.fileSize() >= targetFileSize && 
!hasDeletionFile(file)) {
@@ -281,7 +289,8 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
                         small++;
                     }
                 }
-                if (small > big && toCompact.size() >= FULL_COMPACT_MIN_FILE) {
+                if (forceCompactAllFiles
+                        || (small > big && toCompact.size() >= 
FULL_COMPACT_MIN_FILE)) {
                     return compact(null, toCompact, rewriter);
                 } else {
                     return result(emptyList(), emptyList());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
index ec82e9e530..ef56196e0a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
@@ -55,7 +55,8 @@ public interface CompactStrategy {
             int numLevels,
             List<LevelSortedRun> runs,
             @Nullable RecordLevelExpire recordLevelExpire,
-            @Nullable DeletionVectorsMaintainer dvMaintainer) {
+            @Nullable DeletionVectorsMaintainer dvMaintainer,
+            boolean forceCompactAllFiles) {
         int maxLevel = numLevels - 1;
         if (runs.isEmpty()) {
             // no sorted run, no need to compact
@@ -64,7 +65,10 @@ public interface CompactStrategy {
             List<DataFileMeta> filesToBeCompacted = new ArrayList<>();
 
             for (DataFileMeta file : runs.get(0).run().files()) {
-                if (recordLevelExpire != null && 
recordLevelExpire.isExpireFile(file)) {
+                if (forceCompactAllFiles) {
+                    // add all files when force compacted
+                    filesToBeCompacted.add(file);
+                } else if (recordLevelExpire != null && 
recordLevelExpire.isExpireFile(file)) {
                     // check record level expire for large files
                     filesToBeCompacted.add(file);
                 } else if (dvMaintainer != null
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index a4852e7346..953a235391 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -65,6 +65,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
     @Nullable private final DeletionVectorsMaintainer dvMaintainer;
     private final boolean lazyGenDeletionFile;
     private final boolean needLookup;
+    private final boolean forceCompactAllFiles;
 
     @Nullable private final RecordLevelExpire recordLevelExpire;
 
@@ -80,7 +81,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
             @Nullable DeletionVectorsMaintainer dvMaintainer,
             boolean lazyGenDeletionFile,
             boolean needLookup,
-            @Nullable RecordLevelExpire recordLevelExpire) {
+            @Nullable RecordLevelExpire recordLevelExpire,
+            boolean forceCompactAllFiles) {
         this.executor = executor;
         this.levels = levels;
         this.strategy = strategy;
@@ -93,6 +95,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
         this.lazyGenDeletionFile = lazyGenDeletionFile;
         this.recordLevelExpire = recordLevelExpire;
         this.needLookup = needLookup;
+        this.forceCompactAllFiles = forceCompactAllFiles;
 
         MetricUtils.safeCall(this::reportMetrics, LOG);
     }
@@ -135,7 +138,11 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
             }
             optionalUnit =
                     CompactStrategy.pickFullCompaction(
-                            levels.numberOfLevels(), runs, recordLevelExpire, 
dvMaintainer);
+                            levels.numberOfLevels(),
+                            runs,
+                            recordLevelExpire,
+                            dvMaintainer,
+                            forceCompactAllFiles);
         } else {
             if (taskFuture != null) {
                 return;
@@ -210,7 +217,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
                         metricsReporter,
                         compactDfSupplier,
                         dvMaintainer,
-                        recordLevelExpire);
+                        recordLevelExpire,
+                        forceCompactAllFiles);
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Pick these files (name, level, size) for compaction: {}",
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
index 0ce9fbb4c2..4aa10e9b79 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
@@ -55,6 +55,7 @@ public class MergeTreeCompactTask extends CompactTask {
     private int upgradeFilesNum;
 
     @Nullable private final RecordLevelExpire recordLevelExpire;
+    private final boolean forceCompactAllFiles;
     @Nullable private final DeletionVectorsMaintainer dvMaintainer;
 
     public MergeTreeCompactTask(
@@ -67,7 +68,8 @@ public class MergeTreeCompactTask extends CompactTask {
             @Nullable CompactionMetrics.Reporter metricsReporter,
             Supplier<CompactDeletionFile> compactDfSupplier,
             @Nullable DeletionVectorsMaintainer dvMaintainer,
-            @Nullable RecordLevelExpire recordLevelExpire) {
+            @Nullable RecordLevelExpire recordLevelExpire,
+            boolean forceCompactAllFiles) {
         super(metricsReporter);
         this.minFileSize = minFileSize;
         this.rewriter = rewriter;
@@ -78,6 +80,7 @@ public class MergeTreeCompactTask extends CompactTask {
         this.dropDelete = dropDelete;
         this.maxLevel = maxLevel;
         this.recordLevelExpire = recordLevelExpire;
+        this.forceCompactAllFiles = forceCompactAllFiles;
 
         this.upgradeFilesNum = 0;
     }
@@ -126,12 +129,14 @@ public class MergeTreeCompactTask extends CompactTask {
 
     private void upgrade(DataFileMeta file, CompactResult toUpdate) throws 
Exception {
         if (file.level() == outputLevel) {
-            if (isContainExpiredRecords(file)
+            if (forceCompactAllFiles
+                    || isContainExpiredRecords(file)
                     || (dvMaintainer != null
                             && 
dvMaintainer.deletionVectorOf(file.fileName()).isPresent())) {
                 /*
-                 * 1. if the large file in maxLevel has expired records, we 
need to rewrite it.
-                 * 2. if the large file in maxLevel has corresponding deletion 
vector, we need to rewrite it.
+                 * 1. if files are force picked, we need to rewrite all files.
+                 * 2. if the large file in maxLevel has expired records, we 
need to rewrite it.
+                 * 3. if the large file in maxLevel has corresponding deletion 
vector, we need to rewrite it.
                  */
                 rewriteFile(file, toUpdate);
             }
@@ -139,9 +144,9 @@ public class MergeTreeCompactTask extends CompactTask {
         }
 
         if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 
0).orElse(false)) {
-            if (isContainExpiredRecords(file)) {
-                // if the file which could be directly upgraded has expired 
records, we need to
-                // rewrite it
+            if (forceCompactAllFiles || isContainExpiredRecords(file)) {
+                // if all files are force picked, or the file which could be 
directly upgraded has
+                // expired records, we need to rewrite it
                 rewriteFile(file, toUpdate);
             } else {
                 CompactResult upgradeResult = rewriter.upgrade(outputLevel, 
file);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
index 36201c7f7f..d8f2d9211b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
@@ -91,6 +91,7 @@ public class BucketedAppendFileStoreWrite extends 
BaseAppendFileStoreWrite {
                     dvMaintainer,
                     options.compactionMinFileNum(),
                     options.targetFileSize(false),
+                    options.forceCompactAllFiles(),
                     files -> compactRewrite(partition, bucket, dvFactory, 
files),
                     compactionMetrics == null
                             ? null
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 9a65e83d06..7c0b36701f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -290,7 +290,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                     dvMaintainer,
                     options.prepareCommitWaitCompaction(),
                     options.needLookup(),
-                    recordLevelExpire);
+                    recordLevelExpire,
+                    options.forceCompactAllFiles());
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 2b280220ee..8cb293cd8e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -652,6 +652,7 @@ public class AppendOnlyWriterTest {
                         null,
                         MIN_FILE_NUM,
                         targetFileSize,
+                        false,
                         compactBefore -> {
                             latch.await();
                             return compactBefore.isEmpty()
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
index cfdf38558f..2f031548a8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
@@ -207,6 +207,7 @@ public class BucketedAppendCompactManagerTest {
                         null,
                         minFileNum,
                         targetFileSize,
+                        false,
                         null, // not used
                         null);
         Optional<List<DataFileMeta>> actual = manager.pickCompactBefore();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
index e7c3cce01d..262acd5d4f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
@@ -123,7 +123,7 @@ public class FullCompactTaskTest {
                 Collection<DataFileMeta> inputs,
                 long targetFileSize,
                 BucketedAppendCompactManager.CompactRewriter rewriter) {
-            super(null, inputs, targetFileSize, rewriter, null);
+            super(null, inputs, targetFileSize, false, rewriter, null);
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index c43b3c20c6..c6761d9c92 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -87,7 +87,7 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         SCHEMA,
                         0,
                         new BucketedAppendCompactManager(
-                                null, toCompact, null, 4, 10, null, null), // 
not used
+                                null, toCompact, null, 4, 10, false, null, 
null), // not used
                         null,
                         false,
                         dataFilePathFactory,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 4af6c39800..95daf384e7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -454,7 +454,8 @@ public abstract class MergeTreeTestBase {
                 null,
                 false,
                 options.needLookup(),
-                null);
+                null,
+                false);
     }
 
     static class MockFailResultCompactionManager extends 
MergeTreeCompactManager {
@@ -478,7 +479,8 @@ public abstract class MergeTreeTestBase {
                     null,
                     false,
                     false,
-                    null);
+                    null,
+                    false);
         }
 
         protected CompactResult obtainCompactResult()
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
index 4240555977..4adff94778 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -208,7 +208,8 @@ public class MergeTreeCompactManagerTest {
                         null,
                         false,
                         true,
-                        null);
+                        null,
+                        false);
 
         MergeTreeCompactManager defaultManager =
                 new MergeTreeCompactManager(
@@ -223,7 +224,8 @@ public class MergeTreeCompactManagerTest {
                         null,
                         false,
                         false,
-                        null);
+                        null,
+                        false);
 
         assertThat(lookupManager.compactNotCompleted()).isTrue();
         assertThat(defaultManager.compactNotCompleted()).isFalse();
@@ -259,7 +261,8 @@ public class MergeTreeCompactManagerTest {
                         null,
                         false,
                         false,
-                        null);
+                        null,
+                        false);
         manager.triggerCompaction(false);
         manager.getCompactionResult(true);
         List<LevelMinMax> outputs =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
index d79d13f026..d58c09d3a8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.StreamTableScan;
@@ -35,6 +36,7 @@ import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -196,6 +198,71 @@ public class CompactProcedureITCase extends 
CatalogITCaseBase {
         }
     }
 
+    @Test
+    public void testForceCompactToExternalPath() throws Exception {
+        // test for pk table
+        String tmpPath = getTempDirPath("external/" + UUID.randomUUID());
+        sql(
+                "CREATE TABLE Tpk ("
+                        + " k INT,"
+                        + " v INT,"
+                        + " hh INT,"
+                        + " dt STRING,"
+                        + " PRIMARY KEY (k, dt, hh) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt, hh) WITH ("
+                        + " 'write-only' = 'true',"
+                        + " 'bucket' = '1'"
+                        + ")");
+        FileStoreTable pkTable = paimonTable("Tpk");
+
+        sql(
+                "INSERT INTO Tpk VALUES (1, 100, 15, '20221208'), (1, 100, 16, 
'20221208'), (1, 100, 15, '20221209')");
+        sql(
+                "INSERT INTO Tpk VALUES (2, 100, 15, '20221208'), (2, 100, 16, 
'20221208'), (2, 100, 15, '20221209')");
+        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+        sql(
+                "CALL sys.compact(`table` => 'default.Tpk',"
+                        + " options => 
'compaction.force-compact-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')",
+                tmpPath);
+        List<DataSplit> splits = 
pkTable.newSnapshotReader().read().dataSplits();
+        for (DataSplit split : splits) {
+            for (DataFileMeta meta : split.dataFiles()) {
+                assertThat(meta.externalPath().get().startsWith("file:" + 
tmpPath)).isTrue();
+            }
+        }
+
+        // test for append table
+        tmpPath = getTempDirPath("external/" + UUID.randomUUID());
+        sql(
+                "CREATE TABLE Tap ("
+                        + " k INT,"
+                        + " v INT,"
+                        + " hh INT,"
+                        + " dt STRING"
+                        + ") PARTITIONED BY (dt, hh) WITH ("
+                        + " 'write-only' = 'true',"
+                        + " 'bucket' = '1',"
+                        + " 'bucket-key' = 'k'"
+                        + ")");
+        FileStoreTable apTable = paimonTable("Tap");
+
+        sql(
+                "INSERT INTO Tap VALUES (1, 100, 15, '20221208'), (1, 100, 16, 
'20221208'), (1, 100, 15, '20221209')");
+        sql(
+                "INSERT INTO Tap VALUES (2, 100, 15, '20221208'), (2, 100, 16, 
'20221208'), (2, 100, 15, '20221209')");
+        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+        sql(
+                "CALL sys.compact(`table` => 'default.Tap',"
+                        + " options => 
'compaction.force-compact-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')",
+                tmpPath);
+        splits = apTable.newSnapshotReader().read().dataSplits();
+        for (DataSplit split : splits) {
+            for (DataFileMeta meta : split.dataFiles()) {
+                assertThat(meta.externalPath().get().startsWith("file:" + 
tmpPath)).isTrue();
+            }
+        }
+    }
+
     // ----------------------- Sort Compact -----------------------
 
     @Test

Reply via email to