This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit f684400125ea758ceb70afb82ee969ec857d1082
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Sep 26 21:10:14 2025 +0800

    [core] Apply 'file-operation-thread-num' to commit (#6339)
---
 .../shortcodes/generated/core_configuration.html   | 12 +++++------
 .../main/java/org/apache/paimon/CoreOptions.java   | 11 +++++-----
 ...hreadPool.java => FileOperationThreadPool.java} |  6 +++---
 .../java/org/apache/paimon/AbstractFileStore.java  |  6 +++---
 .../apache/paimon/operation/FileDeletionBase.java  |  4 ++--
 .../paimon/operation/ListUnexistingFiles.java      |  7 +++----
 .../paimon/operation/LocalOrphanFilesClean.java    |  4 ++--
 .../paimon/table/AbstractFileStoreTable.java       |  3 ++-
 .../apache/paimon/table/sink/TableCommitImpl.java  | 24 +++++++++-------------
 .../apache/paimon/operation/FileDeletionTest.java  |  4 ++--
 10 files changed, 39 insertions(+), 42 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 2ca6b6c7bc..0a6f7f2eb9 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -362,12 +362,6 @@ under the License.
             <td>Boolean</td>
             <td>Enable data file thin mode to avoid duplicate columns 
storage.</td>
         </tr>
-        <tr>
-            <td><h5>delete-file.thread-num</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>Integer</td>
-            <td>The maximum number of concurrent deleting files. By default is 
the number of processors available to the Java virtual machine.</td>
-        </tr>
         <tr>
             <td><h5>delete.force-produce-changelog</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -452,6 +446,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether enabled read file index.</td>
         </tr>
+        <tr>
+            <td><h5>file-operation.thread-num</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>The maximum number of concurrent file operations. By default 
is the number of processors available to the Java virtual machine.</td>
+        </tr>
         <tr>
             <td><h5>file-reader-async-threshold</h5></td>
             <td style="word-wrap: break-word;">10 mb</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index ad7bb024f2..355a5fe357 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1769,12 +1769,13 @@ public class CoreOptions implements Serializable {
                                     + "a forced lookup compaction will be 
performed to flush L0 files to higher level. "
                                     + "This option is only valid when 
lookup-compact mode is gentle.");
 
-    public static final ConfigOption<Integer> DELETE_FILE_THREAD_NUM =
-            key("delete-file.thread-num")
+    public static final ConfigOption<Integer> FILE_OPERATION_THREAD_NUM =
+            key("file-operation.thread-num")
                     .intType()
                     .noDefaultValue()
+                    .withFallbackKeys("delete-file.thread-num")
                     .withDescription(
-                            "The maximum number of concurrent deleting files. "
+                            "The maximum number of concurrent file operations. 
"
                                     + "By default is the number of processors 
available to the Java virtual machine.");
 
     public static final ConfigOption<String> SCAN_FALLBACK_BRANCH =
@@ -2266,8 +2267,8 @@ public class CoreOptions implements Serializable {
         return options.get(SNAPSHOT_CLEAN_EMPTY_DIRECTORIES);
     }
 
-    public int deleteFileThreadNum() {
-        return options.getOptional(DELETE_FILE_THREAD_NUM)
+    public int fileOperationThreadNum() {
+        return options.getOptional(FILE_OPERATION_THREAD_NUM)
                 .orElseGet(() -> Runtime.getRuntime().availableProcessors());
     }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileOperationThreadPool.java
similarity index 89%
rename from 
paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
rename to 
paimon-common/src/main/java/org/apache/paimon/utils/FileOperationThreadPool.java
index 638d6f9a4c..87c096c50d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileOperationThreadPool.java
@@ -24,10 +24,10 @@ import java.util.concurrent.ThreadPoolExecutor;
 
 import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
 
-/** Thread pool to delete files using {@link FileIO}. */
-public class FileDeletionThreadPool {
+/** Thread pool to operate files using {@link FileIO}. */
+public class FileOperationThreadPool {
 
-    private static final String THREAD_NAME = "DELETE-FILE-THREAD-POOL";
+    private static final String THREAD_NAME = "FILE-OPERATION-THREAD-POOL";
 
     private static ThreadPoolExecutor executorService =
             createCachedThreadPool(Runtime.getRuntime().availableProcessors(), 
THREAD_NAME);
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 2d71f61580..f4a77f877d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -312,7 +312,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 newStatsFileHandler(),
                 options.changelogProducer() != 
CoreOptions.ChangelogProducer.NONE,
                 options.cleanEmptyDirectories(),
-                options.deleteFileThreadNum());
+                options.fileOperationThreadNum());
     }
 
     @Override
@@ -325,7 +325,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 newIndexFileHandler(),
                 newStatsFileHandler(),
                 options.cleanEmptyDirectories(),
-                options.deleteFileThreadNum());
+                options.fileOperationThreadNum());
     }
 
     @Override
@@ -343,7 +343,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 newIndexFileHandler(),
                 newStatsFileHandler(),
                 options.cleanEmptyDirectories(),
-                options.deleteFileThreadNum());
+                options.fileOperationThreadNum());
     }
 
     public abstract Comparator<InternalRow> newKeyComparator();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index dae3d5a0f2..98a761b471 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -35,7 +35,7 @@ import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.utils.DataFilePathFactories;
-import org.apache.paimon.utils.FileDeletionThreadPool;
+import org.apache.paimon.utils.FileOperationThreadPool;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
@@ -102,7 +102,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
         this.statsFileHandler = statsFileHandler;
         this.cleanEmptyDirectories = cleanEmptyDirectories;
         this.deletionBuckets = new HashMap<>();
-        this.deleteFileExecutor = 
FileDeletionThreadPool.getExecutorService(deleteFileThreadNum);
+        this.deleteFileExecutor = 
FileOperationThreadPool.getExecutorService(deleteFileThreadNum);
     }
 
     /**
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java
index a704a329c7..c47916871a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java
@@ -25,6 +25,7 @@ import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.FileOperationThreadPool;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.ThreadPoolUtils;
 
@@ -38,8 +39,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadPoolExecutor;
 
-import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
-
 /** List what data files recorded in manifests are missing from the 
filesystem. */
 public class ListUnexistingFiles {
 
@@ -51,8 +50,8 @@ public class ListUnexistingFiles {
         this.table = table;
         this.pathFactory = table.store().pathFactory();
         this.executor =
-                createCachedThreadPool(
-                        table.coreOptions().deleteFileThreadNum(), 
"LIST_UNEXISTING_FILES");
+                FileOperationThreadPool.getExecutorService(
+                        table.coreOptions().fileOperationThreadNum());
     }
 
     public Map<Integer, Map<String, DataFileMeta>> list(BinaryRow partition) 
throws Exception {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index 9456e6c9aa..34ff3fb721 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -89,7 +89,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean {
         this.deleteFiles = new ArrayList<>();
         this.executor =
                 createCachedThreadPool(
-                        table.coreOptions().deleteFileThreadNum(), 
"ORPHAN_FILES_CLEAN");
+                        table.coreOptions().fileOperationThreadNum(), 
"ORPHAN_FILES_CLEAN");
         this.dryRun = dryRun;
     }
 
@@ -276,7 +276,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
                         : new HashMap<String, String>() {
                             {
                                 put(
-                                        
CoreOptions.DELETE_FILE_THREAD_NUM.key(),
+                                        
CoreOptions.FILE_OPERATION_THREAD_NUM.key(),
                                         parallelism.toString());
                             }
                         };
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index c1d34a99ef..aa5b8bb70d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -455,7 +455,8 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 new ConsumerManager(fileIO, path, snapshotManager().branch()),
                 options.snapshotExpireExecutionMode(),
                 name(),
-                options.forceCreatingSnapshot());
+                options.forceCreatingSnapshot(),
+                options.fileOperationThreadNum());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 7f19c7443c..4f3bb5b71b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -36,6 +36,7 @@ import org.apache.paimon.tag.TagTimeExpire;
 import org.apache.paimon.utils.CompactedChangelogPathResolver;
 import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.FileOperationThreadPool;
 import org.apache.paimon.utils.IndexFilePathFactories;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -57,6 +58,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -66,7 +68,6 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
 import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
-import static 
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
 import static org.apache.paimon.utils.Preconditions.checkState;
 import static 
org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
 
@@ -79,18 +80,16 @@ public class TableCommitImpl implements InnerTableCommit {
     @Nullable private final Runnable expireSnapshots;
     @Nullable private final PartitionExpire partitionExpire;
     @Nullable private final TagAutoManager tagAutoManager;
-
     @Nullable private final Duration consumerExpireTime;
     private final ConsumerManager consumerManager;
-
     private final ExecutorService maintainExecutor;
     private final AtomicReference<Throwable> maintainError;
-
     private final String tableName;
+    private final boolean forceCreatingSnapshot;
+    private final ThreadPoolExecutor fileCheckExecutor;
 
     @Nullable private Map<String, String> overwritePartition = null;
     private boolean batchCommitted = false;
-    private final boolean forceCreatingSnapshot;
     private boolean expireForEmptyCommit = true;
 
     public TableCommitImpl(
@@ -102,7 +101,8 @@ public class TableCommitImpl implements InnerTableCommit {
             ConsumerManager consumerManager,
             ExpireExecutionMode expireExecutionMode,
             String tableName,
-            boolean forceCreatingSnapshot) {
+            boolean forceCreatingSnapshot,
+            int threadNum) {
         if (partitionExpire != null) {
             commit.withPartitionExpire(partitionExpire);
         }
@@ -125,6 +125,7 @@ public class TableCommitImpl implements InnerTableCommit {
 
         this.tableName = tableName;
         this.forceCreatingSnapshot = forceCreatingSnapshot;
+        this.fileCheckExecutor = 
FileOperationThreadPool.getExecutorService(threadNum);
     }
 
     public boolean forceCreatingSnapshot() {
@@ -294,17 +295,12 @@ public class TableCommitImpl implements InnerTableCommit {
                 msg.newFilesIncrement().newIndexFiles().stream()
                         .map(indexFileFactory::toPath)
                         .forEach(files::add);
-                msg.newFilesIncrement().deletedIndexFiles().stream()
-                        .map(indexFileFactory::toPath)
-                        .forEach(files::add);
-                msg.compactIncrement().compactBefore().forEach(collector);
                 msg.compactIncrement().compactAfter().forEach(collector);
                 msg.compactIncrement().newIndexFiles().stream()
                         .map(indexFileFactory::toPath)
                         .forEach(files::add);
-                msg.compactIncrement().deletedIndexFiles().stream()
-                        .map(indexFileFactory::toPath)
-                        .forEach(files::add);
+
+                // skip compact before files, deleted index files
             }
         }
 
@@ -329,7 +325,7 @@ public class TableCommitImpl implements InnerTableCommit {
         List<Path> nonExistFiles =
                 Lists.newArrayList(
                         randomlyExecuteSequentialReturn(
-                                getExecutorService(null),
+                                fileCheckExecutor,
                                 f -> nonExists.test(f) ? singletonList(f) : 
emptyList(),
                                 resolvedFiles));
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 0bc6e041ab..9d17593c11 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -751,7 +751,7 @@ public class FileDeletionTest {
                         store.newStatsFileHandler(),
                         store.options().changelogProducer() != 
CoreOptions.ChangelogProducer.NONE,
                         store.options().cleanEmptyDirectories(),
-                        store.options().deleteFileThreadNum());
+                        store.options().fileOperationThreadNum());
 
         ExpireSnapshots expireSnapshots =
                 new ExpireSnapshotsImpl(
@@ -816,7 +816,7 @@ public class FileDeletionTest {
                         store.newStatsFileHandler(),
                         store.options().changelogProducer() != 
CoreOptions.ChangelogProducer.NONE,
                         store.options().cleanEmptyDirectories(),
-                        store.options().deleteFileThreadNum());
+                        store.options().fileOperationThreadNum());
         ExpireSnapshots expireSnapshots =
                 new ExpireSnapshotsImpl(
                         snapshotManager, changelogManager, snapshotDeletion, 
tagManager);

Reply via email to