This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new faea393f79 [core] Apply 'file-operation-thread-num' to commit (#6339)
faea393f79 is described below
commit faea393f7998210863357e9bfa032139f17df59b
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Sep 26 21:10:14 2025 +0800
[core] Apply 'file-operation-thread-num' to commit (#6339)
---
.../shortcodes/generated/core_configuration.html | 12 +++++------
.../main/java/org/apache/paimon/CoreOptions.java | 11 +++++-----
...hreadPool.java => FileOperationThreadPool.java} | 6 +++---
.../java/org/apache/paimon/AbstractFileStore.java | 6 +++---
.../apache/paimon/operation/FileDeletionBase.java | 4 ++--
.../paimon/operation/ListUnexistingFiles.java | 7 +++----
.../paimon/operation/LocalOrphanFilesClean.java | 4 ++--
.../paimon/table/AbstractFileStoreTable.java | 3 ++-
.../apache/paimon/table/sink/TableCommitImpl.java | 24 +++++++++-------------
.../apache/paimon/operation/FileDeletionTest.java | 4 ++--
10 files changed, 39 insertions(+), 42 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6cc752a1bf..10b2b4be41 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -362,12 +362,6 @@ under the License.
<td>Boolean</td>
<td>Enable data file thin mode to avoid duplicate columns
storage.</td>
</tr>
- <tr>
- <td><h5>delete-file.thread-num</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>Integer</td>
- <td>The maximum number of concurrent deleting files. By default is
the number of processors available to the Java virtual machine.</td>
- </tr>
<tr>
<td><h5>delete.force-produce-changelog</h5></td>
<td style="word-wrap: break-word;">false</td>
@@ -452,6 +446,12 @@ under the License.
<td>Boolean</td>
<td>Whether enabled read file index.</td>
</tr>
+ <tr>
+ <td><h5>file-operation.thread-num</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>The maximum number of concurrent file operations. By default
is the number of processors available to the Java virtual machine.</td>
+ </tr>
<tr>
<td><h5>file-reader-async-threshold</h5></td>
<td style="word-wrap: break-word;">10 mb</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 7407388449..42293bf5ed 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1769,12 +1769,13 @@ public class CoreOptions implements Serializable {
+ "a forced lookup compaction will be
performed to flush L0 files to higher level. "
+ "This option is only valid when
lookup-compact mode is gentle.");
- public static final ConfigOption<Integer> DELETE_FILE_THREAD_NUM =
- key("delete-file.thread-num")
+ public static final ConfigOption<Integer> FILE_OPERATION_THREAD_NUM =
+ key("file-operation.thread-num")
.intType()
.noDefaultValue()
+ .withFallbackKeys("delete-file.thread-num")
.withDescription(
- "The maximum number of concurrent deleting files. "
+ "The maximum number of concurrent file operations.
"
+ "By default is the number of processors
available to the Java virtual machine.");
public static final ConfigOption<String> SCAN_FALLBACK_BRANCH =
@@ -2278,8 +2279,8 @@ public class CoreOptions implements Serializable {
return options.get(SNAPSHOT_CLEAN_EMPTY_DIRECTORIES);
}
- public int deleteFileThreadNum() {
- return options.getOptional(DELETE_FILE_THREAD_NUM)
+ public int fileOperationThreadNum() {
+ return options.getOptional(FILE_OPERATION_THREAD_NUM)
.orElseGet(() -> Runtime.getRuntime().availableProcessors());
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
b/paimon-common/src/main/java/org/apache/paimon/utils/FileOperationThreadPool.java
similarity index 89%
rename from
paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
rename to
paimon-common/src/main/java/org/apache/paimon/utils/FileOperationThreadPool.java
index 638d6f9a4c..87c096c50d 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/FileOperationThreadPool.java
@@ -24,10 +24,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
-/** Thread pool to delete files using {@link FileIO}. */
-public class FileDeletionThreadPool {
+/** Thread pool to operate files using {@link FileIO}. */
+public class FileOperationThreadPool {
- private static final String THREAD_NAME = "DELETE-FILE-THREAD-POOL";
+ private static final String THREAD_NAME = "FILE-OPERATION-THREAD-POOL";
private static ThreadPoolExecutor executorService =
createCachedThreadPool(Runtime.getRuntime().availableProcessors(),
THREAD_NAME);
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 2d71f61580..f4a77f877d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -312,7 +312,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
newStatsFileHandler(),
options.changelogProducer() !=
CoreOptions.ChangelogProducer.NONE,
options.cleanEmptyDirectories(),
- options.deleteFileThreadNum());
+ options.fileOperationThreadNum());
}
@Override
@@ -325,7 +325,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
newIndexFileHandler(),
newStatsFileHandler(),
options.cleanEmptyDirectories(),
- options.deleteFileThreadNum());
+ options.fileOperationThreadNum());
}
@Override
@@ -343,7 +343,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
newIndexFileHandler(),
newStatsFileHandler(),
options.cleanEmptyDirectories(),
- options.deleteFileThreadNum());
+ options.fileOperationThreadNum());
}
public abstract Comparator<InternalRow> newKeyComparator();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index dae3d5a0f2..98a761b471 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -35,7 +35,7 @@ import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.utils.DataFilePathFactories;
-import org.apache.paimon.utils.FileDeletionThreadPool;
+import org.apache.paimon.utils.FileOperationThreadPool;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
@@ -102,7 +102,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
this.statsFileHandler = statsFileHandler;
this.cleanEmptyDirectories = cleanEmptyDirectories;
this.deletionBuckets = new HashMap<>();
- this.deleteFileExecutor =
FileDeletionThreadPool.getExecutorService(deleteFileThreadNum);
+ this.deleteFileExecutor =
FileOperationThreadPool.getExecutorService(deleteFileThreadNum);
}
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java
index a704a329c7..c47916871a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java
@@ -25,6 +25,7 @@ import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.FileOperationThreadPool;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ThreadPoolUtils;
@@ -38,8 +39,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
-import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
-
/** List what data files recorded in manifests are missing from the
filesystem. */
public class ListUnexistingFiles {
@@ -51,8 +50,8 @@ public class ListUnexistingFiles {
this.table = table;
this.pathFactory = table.store().pathFactory();
this.executor =
- createCachedThreadPool(
- table.coreOptions().deleteFileThreadNum(),
"LIST_UNEXISTING_FILES");
+ FileOperationThreadPool.getExecutorService(
+ table.coreOptions().fileOperationThreadNum());
}
public Map<Integer, Map<String, DataFileMeta>> list(BinaryRow partition)
throws Exception {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index 9456e6c9aa..34ff3fb721 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -89,7 +89,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean {
this.deleteFiles = new ArrayList<>();
this.executor =
createCachedThreadPool(
- table.coreOptions().deleteFileThreadNum(),
"ORPHAN_FILES_CLEAN");
+ table.coreOptions().fileOperationThreadNum(),
"ORPHAN_FILES_CLEAN");
this.dryRun = dryRun;
}
@@ -276,7 +276,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
: new HashMap<String, String>() {
{
put(
-
CoreOptions.DELETE_FILE_THREAD_NUM.key(),
+
CoreOptions.FILE_OPERATION_THREAD_NUM.key(),
parallelism.toString());
}
};
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index c1d34a99ef..aa5b8bb70d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -455,7 +455,8 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
new ConsumerManager(fileIO, path, snapshotManager().branch()),
options.snapshotExpireExecutionMode(),
name(),
- options.forceCreatingSnapshot());
+ options.forceCreatingSnapshot(),
+ options.fileOperationThreadNum());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 7f19c7443c..4f3bb5b71b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -36,6 +36,7 @@ import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.CompactedChangelogPathResolver;
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.FileOperationThreadPool;
import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -57,6 +58,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -66,7 +68,6 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
-import static
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
import static org.apache.paimon.utils.Preconditions.checkState;
import static
org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
@@ -79,18 +80,16 @@ public class TableCommitImpl implements InnerTableCommit {
@Nullable private final Runnable expireSnapshots;
@Nullable private final PartitionExpire partitionExpire;
@Nullable private final TagAutoManager tagAutoManager;
-
@Nullable private final Duration consumerExpireTime;
private final ConsumerManager consumerManager;
-
private final ExecutorService maintainExecutor;
private final AtomicReference<Throwable> maintainError;
-
private final String tableName;
+ private final boolean forceCreatingSnapshot;
+ private final ThreadPoolExecutor fileCheckExecutor;
@Nullable private Map<String, String> overwritePartition = null;
private boolean batchCommitted = false;
- private final boolean forceCreatingSnapshot;
private boolean expireForEmptyCommit = true;
public TableCommitImpl(
@@ -102,7 +101,8 @@ public class TableCommitImpl implements InnerTableCommit {
ConsumerManager consumerManager,
ExpireExecutionMode expireExecutionMode,
String tableName,
- boolean forceCreatingSnapshot) {
+ boolean forceCreatingSnapshot,
+ int threadNum) {
if (partitionExpire != null) {
commit.withPartitionExpire(partitionExpire);
}
@@ -125,6 +125,7 @@ public class TableCommitImpl implements InnerTableCommit {
this.tableName = tableName;
this.forceCreatingSnapshot = forceCreatingSnapshot;
+ this.fileCheckExecutor =
FileOperationThreadPool.getExecutorService(threadNum);
}
public boolean forceCreatingSnapshot() {
@@ -294,17 +295,12 @@ public class TableCommitImpl implements InnerTableCommit {
msg.newFilesIncrement().newIndexFiles().stream()
.map(indexFileFactory::toPath)
.forEach(files::add);
- msg.newFilesIncrement().deletedIndexFiles().stream()
- .map(indexFileFactory::toPath)
- .forEach(files::add);
- msg.compactIncrement().compactBefore().forEach(collector);
msg.compactIncrement().compactAfter().forEach(collector);
msg.compactIncrement().newIndexFiles().stream()
.map(indexFileFactory::toPath)
.forEach(files::add);
- msg.compactIncrement().deletedIndexFiles().stream()
- .map(indexFileFactory::toPath)
- .forEach(files::add);
+
+ // skip compact before files, deleted index files
}
}
@@ -329,7 +325,7 @@ public class TableCommitImpl implements InnerTableCommit {
List<Path> nonExistFiles =
Lists.newArrayList(
randomlyExecuteSequentialReturn(
- getExecutorService(null),
+ fileCheckExecutor,
f -> nonExists.test(f) ? singletonList(f) :
emptyList(),
resolvedFiles));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 0bc6e041ab..9d17593c11 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -751,7 +751,7 @@ public class FileDeletionTest {
store.newStatsFileHandler(),
store.options().changelogProducer() !=
CoreOptions.ChangelogProducer.NONE,
store.options().cleanEmptyDirectories(),
- store.options().deleteFileThreadNum());
+ store.options().fileOperationThreadNum());
ExpireSnapshots expireSnapshots =
new ExpireSnapshotsImpl(
@@ -816,7 +816,7 @@ public class FileDeletionTest {
store.newStatsFileHandler(),
store.options().changelogProducer() !=
CoreOptions.ChangelogProducer.NONE,
store.options().cleanEmptyDirectories(),
- store.options().deleteFileThreadNum());
+ store.options().fileOperationThreadNum());
ExpireSnapshots expireSnapshots =
new ExpireSnapshotsImpl(
snapshotManager, changelogManager, snapshotDeletion,
tagManager);