This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 54c9d9f61ba branch-3.1: [fix](iceberg)Fix the thread pool issue used
for commit. #51508 (#52572)
54c9d9f61ba is described below
commit 54c9d9f61bad2fb5b7a401fc8fdd37639ba988ad
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Wed Jul 2 10:54:48 2025 +0800
branch-3.1: [fix](iceberg)Fix the thread pool issue used for commit. #51508
(#52572)
bp #51508
---------
Co-authored-by: wuwenchi <[email protected]>
---
.../src/main/java/org/apache/doris/datasource/ExternalCatalog.java | 2 +-
.../java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java | 5 +++++
.../java/org/apache/doris/datasource/iceberg/IcebergTransaction.java | 4 ++--
.../org/apache/doris/datasource/iceberg/source/IcebergScanNode.java | 2 +-
4 files changed, 9 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 5c2857cc0b6..ec91c396eb1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -1279,7 +1279,7 @@ public abstract class ExternalCatalog
return transactionManager;
}
- public ThreadPoolExecutor getThreadPoolWithPreAuth() {
+ public ThreadPoolExecutor getThreadPoolExecutor() {
return threadPoolWithPreAuth;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 7bc95aa48dd..9f304927b66 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
public class IcebergMetadataOps implements ExternalMetadataOps {
@@ -346,4 +347,8 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
private Namespace getNamespace() {
return externalCatalogName.map(Namespace::of).orElseGet(() ->
Namespace.empty());
}
+
+ public ThreadPoolExecutor getThreadPoolWithPreAuth() {
+ return dorisCatalog.getThreadPoolExecutor();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index e36db86022e..797caea0dea 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -155,7 +155,7 @@ public class IcebergTransaction implements Transaction {
private void commitAppendTxn(Table table, List<WriteResult>
pendingResults) {
// commit append files.
- AppendFiles appendFiles = table.newAppend();
+ AppendFiles appendFiles =
table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
for (WriteResult result : pendingResults) {
Preconditions.checkState(result.referencedDataFiles().length == 0,
"Should have no referenced data files for append.");
@@ -171,7 +171,7 @@ public class IcebergTransaction implements Transaction {
// 1. if dst_tb is a partitioned table, it will return directly.
// 2. if dst_tb is an unpartitioned table, the `dst_tb` table will
be emptied.
if (!table.spec().isPartitioned()) {
- OverwriteFiles overwriteFiles = table.newOverwrite();
+ OverwriteFiles overwriteFiles =
table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth());
try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
fileScanTasks.forEach(f ->
overwriteFiles.deleteFile(f.file()));
} catch (IOException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index d2f31214b0b..858aa0ecf86 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -295,7 +295,7 @@ public class IcebergScanNode extends FileQueryScanNode {
this.pushdownIcebergPredicates.add(predicate.toString());
}
- icebergTableScan =
scan.planWith(source.getCatalog().getThreadPoolWithPreAuth());
+ icebergTableScan =
scan.planWith(source.getCatalog().getThreadPoolExecutor());
return icebergTableScan;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]