This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.2 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 7306a0ac4b4df5fe84034835df087f63937aca45 Author: Jingsong Lee <[email protected]> AuthorDate: Mon Jun 16 10:16:53 2025 +0800 [core] Simplify force rewrite files in Compact Task (#5751) --- .../shortcodes/generated/core_configuration.html | 2 +- .../main/java/org/apache/paimon/CoreOptions.java | 8 +-- .../append/BucketedAppendCompactManager.java | 20 +++---- .../org/apache/paimon/compact/CompactUnit.java | 43 ++++++++------ .../paimon/mergetree/compact/CompactStrategy.java | 35 ++++------- .../mergetree/compact/FileRewriteCompactTask.java | 67 ++++++++++++++++++++++ .../mergetree/compact/MergeTreeCompactManager.java | 45 +++++++++------ .../mergetree/compact/MergeTreeCompactTask.java | 59 ++++++------------- .../operation/BucketedAppendFileStoreWrite.java | 2 +- .../paimon/operation/KeyValueFileStoreWrite.java | 2 +- .../flink/procedure/CompactProcedureITCase.java | 4 +- 11 files changed, 167 insertions(+), 120 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 6bee1104a1..d7b21f18f7 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -183,7 +183,7 @@ under the License. <td>Ratio of the deleted rows in a data file to be forced compacted for append-only table.</td> </tr> <tr> - <td><h5>compaction.force-compact-all-files</h5></td> + <td><h5>compaction.force-rewrite-all-files</h5></td> <td style="word-wrap: break-word;">false</td> <td>Boolean</td> <td>Whether to force pick all files for a full compaction. Usually seen in a compaction task to external paths.</td> diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 47967be549..4c446dc4e5 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -151,8 +151,8 @@ public class CoreOptions implements Serializable { + ExternalPathStrategy.SPECIFIC_FS + ", should be the prefix scheme of the external path, now supported are s3 and oss."); - public static final ConfigOption<Boolean> COMPACTION_FORCE_COMPACT_ALL_FILES = - key("compaction.force-compact-all-files") + public static final ConfigOption<Boolean> COMPACTION_FORCE_REWRITE_ALL_FILES = + key("compaction.force-rewrite-all-files") .booleanType() .defaultValue(false) .withDescription( @@ -2473,8 +2473,8 @@ public class CoreOptions implements Serializable { return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS); } - public Boolean forceCompactAllFiles() { - return options.get(COMPACTION_FORCE_COMPACT_ALL_FILES); + public Boolean forceRewriteAllFiles() { + return options.get(COMPACTION_FORCE_REWRITE_ALL_FILES); } public String partitionTimestampFormatter() { diff --git a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java index 520fd8edb3..a27a40e8a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java @@ -60,7 +60,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { private final PriorityQueue<DataFileMeta> toCompact; private final int minFileNum; private final long targetFileSize; - private final boolean forceCompactAllFiles; + private final boolean forceRewriteAllFiles; private final CompactRewriter rewriter; private List<DataFileMeta> compacting; @@ -73,7 +73,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { @Nullable DeletionVectorsMaintainer dvMaintainer, int minFileNum, long targetFileSize, - boolean forceCompactAllFiles, + boolean forceRewriteAllFiles, CompactRewriter rewriter, @Nullable CompactionMetrics.Reporter metricsReporter) { this.executor = executor; @@ -82,7 +82,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { this.toCompact.addAll(restored); this.minFileNum = minFileNum; this.targetFileSize = targetFileSize; - this.forceCompactAllFiles = forceCompactAllFiles; + this.forceRewriteAllFiles = forceRewriteAllFiles; this.rewriter = rewriter; this.metricsReporter = metricsReporter; } @@ -102,7 +102,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { "A compaction task is still running while the user " + "forces a new compaction. This is unexpected."); // if all files are force picked or deletion vector enables, always trigger compaction. - if (!forceCompactAllFiles + if (!forceRewriteAllFiles && (toCompact.isEmpty() || (dvMaintainer == null && toCompact.size() < FULL_COMPACT_MIN_FILE))) { return; @@ -118,7 +118,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { dvMaintainer, toCompact, targetFileSize, - forceCompactAllFiles, + forceRewriteAllFiles, rewriter, metricsReporter)); recordCompactionsQueuedRequest(); @@ -243,28 +243,28 @@ public class BucketedAppendCompactManager extends CompactFutureManager { private final DeletionVectorsMaintainer dvMaintainer; private final LinkedList<DataFileMeta> toCompact; private final long targetFileSize; - private final boolean forceCompactAllFiles; + private final boolean forceRewriteAllFiles; private final CompactRewriter rewriter; public FullCompactTask( DeletionVectorsMaintainer dvMaintainer, Collection<DataFileMeta> inputs, long targetFileSize, - boolean forceCompactAllFiles, + boolean forceRewriteAllFiles, CompactRewriter rewriter, @Nullable CompactionMetrics.Reporter metricsReporter) { super(metricsReporter); this.dvMaintainer = dvMaintainer; this.toCompact = new LinkedList<>(inputs); this.targetFileSize = targetFileSize; - this.forceCompactAllFiles = forceCompactAllFiles; + this.forceRewriteAllFiles = forceRewriteAllFiles; this.rewriter = rewriter; } @Override protected CompactResult doCompact() throws Exception { // remove large files - while (!forceCompactAllFiles && !toCompact.isEmpty()) { + while (!forceRewriteAllFiles && !toCompact.isEmpty()) { DataFileMeta file = toCompact.peekFirst(); // the data file with deletion file always need to be compacted. if (file.fileSize() >= targetFileSize && !hasDeletionFile(file)) { @@ -289,7 +289,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { small++; } } - if (forceCompactAllFiles + if (forceRewriteAllFiles || (small > big && toCompact.size() >= FULL_COMPACT_MIN_FILE)) { return compact(null, toCompact, rewriter); } else { diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java index 9b0fdd26ad..ae367f9171 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java @@ -25,31 +25,40 @@ import java.util.ArrayList; import java.util.List; /** A files unit for compaction. */ -public interface CompactUnit { +public class CompactUnit { - int outputLevel(); + private final int outputLevel; + private final List<DataFileMeta> files; + private final boolean fileRewrite; - List<DataFileMeta> files(); + public CompactUnit(int outputLevel, List<DataFileMeta> files, boolean fileRewrite) { + this.outputLevel = outputLevel; + this.files = files; + this.fileRewrite = fileRewrite; + } + + public int outputLevel() { + return outputLevel; + } + + public List<DataFileMeta> files() { + return files; + } + + public boolean fileRewrite() { + return fileRewrite; + } - static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun> runs) { + public static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun> runs) { List<DataFileMeta> files = new ArrayList<>(); for (LevelSortedRun run : runs) { files.addAll(run.run().files()); } - return fromFiles(outputLevel, files); + return fromFiles(outputLevel, files, false); } - static CompactUnit fromFiles(int outputLevel, List<DataFileMeta> files) { - return new CompactUnit() { - @Override - public int outputLevel() { - return outputLevel; - } - - @Override - public List<DataFileMeta> files() { - return files; - } - }; + public static CompactUnit fromFiles( + int outputLevel, List<DataFileMeta> files, boolean fileRewrite) { + return new CompactUnit(outputLevel, files, fileRewrite); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java index ef56196e0a..0ab0981963 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java @@ -32,7 +32,6 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; /** Compact strategy to decide which files to select for compaction. */ public interface CompactStrategy { @@ -56,16 +55,19 @@ public interface CompactStrategy { List<LevelSortedRun> runs, @Nullable RecordLevelExpire recordLevelExpire, @Nullable DeletionVectorsMaintainer dvMaintainer, - boolean forceCompactAllFiles) { + boolean forceRewriteAllFiles) { int maxLevel = numLevels - 1; if (runs.isEmpty()) { // no sorted run, no need to compact return Optional.empty(); - } else if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) { + } + + // only max level files + if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) { List<DataFileMeta> filesToBeCompacted = new ArrayList<>(); for (DataFileMeta file : runs.get(0).run().files()) { - if (forceCompactAllFiles) { + if (forceRewriteAllFiles) { // add all files when force compacted filesToBeCompacted.add(file); } else if (recordLevelExpire != null && recordLevelExpire.isExpireFile(file)) { @@ -78,27 +80,14 @@ public interface CompactStrategy { } } - if (LOG.isDebugEnabled()) { - LOG.debug( - "Pick these files which have expired records or dv index for full compaction: {}", - filesToBeCompacted.stream() - .map( - file -> - String.format( - "(%s, %d, %d)", - file.fileName(), - file.level(), - file.fileSize())) - .collect(Collectors.joining(", "))); - } - - if (!filesToBeCompacted.isEmpty()) { - return Optional.of(CompactUnit.fromFiles(maxLevel, filesToBeCompacted)); - } else { + if (filesToBeCompacted.isEmpty()) { return Optional.empty(); } - } else { - return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs)); + + return Optional.of(CompactUnit.fromFiles(maxLevel, filesToBeCompacted, true)); } + + // full compaction + return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs)); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java new file mode 100644 index 0000000000..620ea0748a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact; + +import org.apache.paimon.compact.CompactResult; +import org.apache.paimon.compact.CompactTask; +import org.apache.paimon.compact.CompactUnit; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.mergetree.SortedRun; +import org.apache.paimon.operation.metrics.CompactionMetrics; + +import javax.annotation.Nullable; + +import java.util.List; + +import static java.util.Collections.singletonList; + +/** Compact task for file rewrite compaction. */ +public class FileRewriteCompactTask extends CompactTask { + + private final CompactRewriter rewriter; + private final int outputLevel; + private final List<DataFileMeta> files; + private final boolean dropDelete; + + public FileRewriteCompactTask( + CompactRewriter rewriter, + CompactUnit unit, + boolean dropDelete, + @Nullable CompactionMetrics.Reporter metricsReporter) { + super(metricsReporter); + this.rewriter = rewriter; + this.outputLevel = unit.outputLevel(); + this.files = unit.files(); + this.dropDelete = dropDelete; + } + + @Override + protected CompactResult doCompact() throws Exception { + CompactResult result = new CompactResult(); + for (DataFileMeta file : files) { + rewriteFile(file, result); + } + return result; + } + + private void rewriteFile(DataFileMeta file, CompactResult toUpdate) throws Exception { + List<List<SortedRun>> candidate = singletonList(singletonList(SortedRun.fromSingle(file))); + toUpdate.merge(rewriter.rewrite(outputLevel, dropDelete, candidate)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index 953a235391..025fd20156 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -23,6 +23,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.compact.CompactDeletionFile; import org.apache.paimon.compact.CompactFutureManager; import org.apache.paimon.compact.CompactResult; +import org.apache.paimon.compact.CompactTask; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; @@ -65,7 +66,7 @@ public class MergeTreeCompactManager extends CompactFutureManager { @Nullable private final DeletionVectorsMaintainer dvMaintainer; private final boolean lazyGenDeletionFile; private final boolean needLookup; - private final boolean forceCompactAllFiles; + private final boolean forceRewriteAllFiles; @Nullable private final RecordLevelExpire recordLevelExpire; @@ -82,7 +83,7 @@ public class MergeTreeCompactManager extends CompactFutureManager { boolean lazyGenDeletionFile, boolean needLookup, @Nullable RecordLevelExpire recordLevelExpire, - boolean forceCompactAllFiles) { + boolean forceRewriteAllFiles) { this.executor = executor; this.levels = levels; this.strategy = strategy; @@ -95,7 +96,7 @@ public class MergeTreeCompactManager extends CompactFutureManager { this.lazyGenDeletionFile = lazyGenDeletionFile; this.recordLevelExpire = recordLevelExpire; this.needLookup = needLookup; - this.forceCompactAllFiles = forceCompactAllFiles; + this.forceRewriteAllFiles = forceRewriteAllFiles; MetricUtils.safeCall(this::reportMetrics, LOG); } @@ -142,7 +143,7 @@ public class MergeTreeCompactManager extends CompactFutureManager { runs, recordLevelExpire, dvMaintainer, - forceCompactAllFiles); + forceRewriteAllFiles); } else { if (taskFuture != null) { return; @@ -152,7 +153,7 @@ public class MergeTreeCompactManager extends CompactFutureManager { } optionalUnit = strategy.pick(levels.numberOfLevels(), runs) - .filter(unit -> unit.files().size() > 0) + .filter(unit -> !unit.files().isEmpty()) .filter( unit -> unit.files().size() > 1 @@ -206,22 +207,28 @@ public class MergeTreeCompactManager extends CompactFutureManager { : () -> CompactDeletionFile.generateFiles(dvMaintainer); } - MergeTreeCompactTask task = - new MergeTreeCompactTask( - keyComparator, - compactionFileSize, - rewriter, - unit, - dropDelete, - levels.maxLevel(), - metricsReporter, - compactDfSupplier, - dvMaintainer, - recordLevelExpire, - forceCompactAllFiles); + CompactTask task; + if (unit.fileRewrite()) { + task = new FileRewriteCompactTask(rewriter, unit, dropDelete, metricsReporter); + } else { + task = + new MergeTreeCompactTask( + keyComparator, + compactionFileSize, + rewriter, + unit, + dropDelete, + levels.maxLevel(), + metricsReporter, + compactDfSupplier, + recordLevelExpire, + forceRewriteAllFiles); + } + if (LOG.isDebugEnabled()) { LOG.debug( - "Pick these files (name, level, size) for compaction: {}", + "Pick these files (name, level, size) for {} compaction: {}", + task.getClass().getSimpleName(), unit.files().stream() .map( file -> diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java index 4aa10e9b79..667a965c62 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java @@ -23,7 +23,6 @@ import org.apache.paimon.compact.CompactResult; import org.apache.paimon.compact.CompactTask; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.RecordLevelExpire; import org.apache.paimon.mergetree.SortedRun; @@ -45,19 +44,15 @@ public class MergeTreeCompactTask extends CompactTask { private final CompactRewriter rewriter; private final int outputLevel; private final Supplier<CompactDeletionFile> compactDfSupplier; - private final List<List<SortedRun>> partitioned; - private final boolean dropDelete; private final int maxLevel; + @Nullable private final RecordLevelExpire recordLevelExpire; + private final boolean forceRewriteAllFiles; // metric private int upgradeFilesNum; - @Nullable private final RecordLevelExpire recordLevelExpire; - private final boolean forceCompactAllFiles; - @Nullable private final DeletionVectorsMaintainer dvMaintainer; - public MergeTreeCompactTask( Comparator<InternalRow> keyComparator, long minFileSize, @@ -67,20 +62,18 @@ public class MergeTreeCompactTask extends CompactTask { int maxLevel, @Nullable CompactionMetrics.Reporter metricsReporter, Supplier<CompactDeletionFile> compactDfSupplier, - @Nullable DeletionVectorsMaintainer dvMaintainer, @Nullable RecordLevelExpire recordLevelExpire, - boolean forceCompactAllFiles) { + boolean forceRewriteAllFiles) { super(metricsReporter); this.minFileSize = minFileSize; this.rewriter = rewriter; this.outputLevel = unit.outputLevel(); this.compactDfSupplier = compactDfSupplier; - this.dvMaintainer = dvMaintainer; this.partitioned = new IntervalPartition(unit.files(), keyComparator).partition(); this.dropDelete = dropDelete; this.maxLevel = maxLevel; this.recordLevelExpire = recordLevelExpire; - this.forceCompactAllFiles = forceCompactAllFiles; + this.forceRewriteAllFiles = forceRewriteAllFiles; this.upgradeFilesNum = 0; } @@ -128,34 +121,19 @@ public class MergeTreeCompactTask extends CompactTask { } private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception { - if (file.level() == outputLevel) { - if (forceCompactAllFiles - || isContainExpiredRecords(file) - || (dvMaintainer != null - && dvMaintainer.deletionVectorOf(file.fileName()).isPresent())) { - /* - * 1. if files are force picked, we need to rewrite all files. - * 2. if the large file in maxLevel has expired records, we need to rewrite it. - * 3. if the large file in maxLevel has corresponding deletion vector, we need to rewrite it. - */ - rewriteFile(file, toUpdate); - } + if ((outputLevel == maxLevel && containsDeleteRecords(file)) + || forceRewriteAllFiles + || containsExpiredRecords(file)) { + List<List<SortedRun>> candidate = new ArrayList<>(); + candidate.add(singletonList(SortedRun.fromSingle(file))); + rewriteImpl(candidate, toUpdate); return; } - if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 0).orElse(false)) { - if (forceCompactAllFiles || isContainExpiredRecords(file)) { - // if all files are force picked, or the file which could be directly upgraded has - // expired records, we need to rewrite it - rewriteFile(file, toUpdate); - } else { - CompactResult upgradeResult = rewriter.upgrade(outputLevel, file); - toUpdate.merge(upgradeResult); - upgradeFilesNum++; - } - } else { - // files with delete records should not be upgraded directly to max level - rewriteFile(file, toUpdate); + if (file.level() != outputLevel) { + CompactResult upgradeResult = rewriter.upgrade(outputLevel, file); + toUpdate.merge(upgradeResult); + upgradeFilesNum++; } } @@ -185,14 +163,11 @@ public class MergeTreeCompactTask extends CompactTask { candidate.clear(); } - private void rewriteFile(DataFileMeta file, CompactResult toUpdate) throws Exception { - List<List<SortedRun>> candidate = new ArrayList<>(); - candidate.add(new ArrayList<>()); - candidate.get(0).add(SortedRun.fromSingle(file)); - rewriteImpl(candidate, toUpdate); + private boolean containsDeleteRecords(DataFileMeta file) { + return file.deleteRowCount().map(d -> d > 0).orElse(true); } - private boolean isContainExpiredRecords(DataFileMeta file) { + private boolean containsExpiredRecords(DataFileMeta file) { return recordLevelExpire != null && recordLevelExpire.isExpireFile(file); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java index d8f2d9211b..03320ad6e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java @@ -91,7 +91,7 @@ public class BucketedAppendFileStoreWrite extends BaseAppendFileStoreWrite { dvMaintainer, options.compactionMinFileNum(), options.targetFileSize(false), - options.forceCompactAllFiles(), + options.forceRewriteAllFiles(), files -> compactRewrite(partition, bucket, dvFactory, files), compactionMetrics == null ? null diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 7c0b36701f..30fce29821 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -291,7 +291,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { options.prepareCommitWaitCompaction(), options.needLookup(), recordLevelExpire, - options.forceCompactAllFiles()); + options.forceRewriteAllFiles()); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java index d58c09d3a8..02001ff1af 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java @@ -222,7 +222,7 @@ public class CompactProcedureITCase extends CatalogITCaseBase { tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); sql( "CALL sys.compact(`table` => 'default.Tpk'," - + " options => 'compaction.force-compact-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')", + + " options => 'compaction.force-rewrite-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')", tmpPath); List<DataSplit> splits = pkTable.newSnapshotReader().read().dataSplits(); for (DataSplit split : splits) { @@ -253,7 +253,7 @@ public class CompactProcedureITCase extends CatalogITCaseBase { tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); sql( "CALL sys.compact(`table` => 'default.Tap'," - + " options => 'compaction.force-compact-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')", + + " options => 'compaction.force-rewrite-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')", tmpPath); splits = apTable.newSnapshotReader().read().dataSplits(); for (DataSplit split : splits) {
