This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 54c9d9f61ba branch-3.1: [fix](iceberg)Fix the thread pool issue used 
for commit. #51508 (#52572)
54c9d9f61ba is described below

commit 54c9d9f61bad2fb5b7a401fc8fdd37639ba988ad
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Wed Jul 2 10:54:48 2025 +0800

    branch-3.1: [fix](iceberg)Fix the thread pool issue used for commit. #51508 
(#52572)
    
    bp #51508
    
    ---------
    
    Co-authored-by: wuwenchi <[email protected]>
---
 .../src/main/java/org/apache/doris/datasource/ExternalCatalog.java   | 2 +-
 .../java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java | 5 +++++
 .../java/org/apache/doris/datasource/iceberg/IcebergTransaction.java | 4 ++--
 .../org/apache/doris/datasource/iceberg/source/IcebergScanNode.java  | 2 +-
 4 files changed, 9 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 5c2857cc0b6..ec91c396eb1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -1279,7 +1279,7 @@ public abstract class ExternalCatalog
         return transactionManager;
     }
 
-    public ThreadPoolExecutor getThreadPoolWithPreAuth() {
+    public ThreadPoolExecutor getThreadPoolExecutor() {
         return threadPoolWithPreAuth;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 7bc95aa48dd..9f304927b66 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
 
 public class IcebergMetadataOps implements ExternalMetadataOps {
@@ -346,4 +347,8 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
     private Namespace getNamespace() {
         return externalCatalogName.map(Namespace::of).orElseGet(() -> 
Namespace.empty());
     }
+
+    public ThreadPoolExecutor getThreadPoolWithPreAuth() {
+        return dorisCatalog.getThreadPoolExecutor();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index e36db86022e..797caea0dea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -155,7 +155,7 @@ public class IcebergTransaction implements Transaction {
 
     private void commitAppendTxn(Table table, List<WriteResult> 
pendingResults) {
         // commit append files.
-        AppendFiles appendFiles = table.newAppend();
+        AppendFiles appendFiles = 
table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
         for (WriteResult result : pendingResults) {
             Preconditions.checkState(result.referencedDataFiles().length == 0,
                     "Should have no referenced data files for append.");
@@ -171,7 +171,7 @@ public class IcebergTransaction implements Transaction {
             // 1. if dst_tb is a partitioned table, it will return directly.
             // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will 
be emptied.
             if (!table.spec().isPartitioned()) {
-                OverwriteFiles overwriteFiles = table.newOverwrite();
+                OverwriteFiles overwriteFiles = 
table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth());
                 try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
                     fileScanTasks.forEach(f -> 
overwriteFiles.deleteFile(f.file()));
                 } catch (IOException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index d2f31214b0b..858aa0ecf86 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -295,7 +295,7 @@ public class IcebergScanNode extends FileQueryScanNode {
             this.pushdownIcebergPredicates.add(predicate.toString());
         }
 
-        icebergTableScan = 
scan.planWith(source.getCatalog().getThreadPoolWithPreAuth());
+        icebergTableScan = 
scan.planWith(source.getCatalog().getThreadPoolExecutor());
 
         return icebergTableScan;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to