This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b0d8085e [AMORO-3312]Separate the shared iceberg-worker-pool for 
planning and committing (#3313)
7b0d8085e is described below

commit 7b0d8085eaab8865e695d11f1da117374533825f
Author: rfyu <[email protected]>
AuthorDate: Mon Nov 18 19:12:45 2024 +0800

    [AMORO-3312]Separate the shared iceberg-worker-pool for planning and 
committing (#3313)
    
    * [AMORO-3312]Separate the shared iceberg-worker-pool for planning and 
committing
    
    * fix UT Error
    
    * Modify configuration names
    
    ---------
    
    Co-authored-by: Xavier Bai <[email protected]>
---
 .../apache/amoro/server/AmoroManagementConf.java   | 16 +++++
 .../apache/amoro/server/AmoroServiceContainer.java | 13 +++-
 .../server/optimizing/UnKeyedTableCommit.java      |  7 +-
 .../scan/IcebergTableFileScanHelper.java           |  8 ++-
 .../org/apache/amoro/utils/IcebergThreadPools.java | 81 ++++++++++++++++++++++
 5 files changed, 121 insertions(+), 4 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index f52a0113f..c21a2ca1b 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -59,6 +59,22 @@ public class AmoroManagementConf {
               "Sets the size of the worker pool. The worker pool limits the 
number of tasks concurrently processing "
                   + "manifests in the base table implementation across all 
concurrent planning or commit operations.");
 
+  public static final ConfigOption<Integer> 
TABLE_MANIFEST_IO_PLANNING_THREAD_COUNT =
+      ConfigOptions.key("self-optimizing.plan-manifest-io-thread-count")
+          .intType()
+          .defaultValue(10)
+          .withDescription(
+              "Sets the size of the worker pool. The worker pool limits the 
number of tasks concurrently processing "
+                  + "manifests in the base table implementation across all 
concurrent planning operations.");
+
+  public static final ConfigOption<Integer> 
TABLE_MANIFEST_IO_COMMIT_THREAD_COUNT =
+      ConfigOptions.key("self-optimizing.commit-manifest-io-thread-count")
+          .intType()
+          .defaultValue(10)
+          .withDescription(
+              "Sets the size of the worker pool. The worker pool limits the 
number of tasks concurrently processing "
+                  + "manifests in the base table implementation across all 
concurrent commit operations.");
+
   public static final ConfigOption<Long> REFRESH_EXTERNAL_CATALOGS_INTERVAL =
       ConfigOptions.key("refresh-external-catalogs.interval")
           .longType()
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 3d70c0acb..400fc6235 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -59,6 +59,7 @@ import 
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TNonblockingSer
 import 
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException;
 import 
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory;
 import 
org.apache.amoro.shade.thrift.org.apache.thrift.transport.layered.TFramedTransport;
+import org.apache.amoro.utils.IcebergThreadPools;
 import org.apache.amoro.utils.JacksonUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.iceberg.SystemProperties;
@@ -449,10 +450,20 @@ public class AmoroServiceContainer {
     private void setIcebergSystemProperties() {
       int workerThreadPoolSize =
           Math.max(
-              Runtime.getRuntime().availableProcessors(),
+              Runtime.getRuntime().availableProcessors() / 2,
               
serviceConfig.getInteger(AmoroManagementConf.TABLE_MANIFEST_IO_THREAD_COUNT));
       System.setProperty(
           SystemProperties.WORKER_THREAD_POOL_SIZE_PROP, 
String.valueOf(workerThreadPoolSize));
+      int planningThreadPoolSize =
+          Math.max(
+              Runtime.getRuntime().availableProcessors() / 2,
+              serviceConfig.getInteger(
+                  
AmoroManagementConf.TABLE_MANIFEST_IO_PLANNING_THREAD_COUNT));
+      int commitThreadPoolSize =
+          Math.max(
+              Runtime.getRuntime().availableProcessors() / 2,
+              
serviceConfig.getInteger(AmoroManagementConf.TABLE_MANIFEST_IO_COMMIT_THREAD_COUNT));
+      IcebergThreadPools.init(planningThreadPoolSize, commitThreadPoolSize);
     }
 
     private void initContainerConfig() {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
index 957aa3762..dbeb37a58 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
@@ -39,6 +39,7 @@ import org.apache.amoro.server.utils.IcebergTableUtil;
 import org.apache.amoro.table.MixedTable;
 import org.apache.amoro.table.UnkeyedTable;
 import org.apache.amoro.utils.ContentFiles;
+import org.apache.amoro.utils.IcebergThreadPools;
 import org.apache.amoro.utils.MixedTableUtil;
 import org.apache.amoro.utils.TableFileUtil;
 import org.apache.amoro.utils.TablePropertyUtil;
@@ -262,7 +263,8 @@ public class UnKeyedTableCommit {
       return;
     }
 
-    RewriteFiles rewriteFiles = transaction.newRewrite();
+    RewriteFiles rewriteFiles =
+        
transaction.newRewrite().scanManifestsWith(IcebergThreadPools.getCommitExecutor());
     if (targetSnapshotId != Constants.INVALID_SNAPSHOT_ID) {
       long sequenceNumber = 
table.asUnkeyedTable().snapshot(targetSnapshotId).sequenceNumber();
       
rewriteFiles.validateFromSnapshot(targetSnapshotId).dataSequenceNumber(sequenceNumber);
@@ -282,7 +284,8 @@ public class UnKeyedTableCommit {
   }
 
   private void addDeleteFiles(Transaction transaction, Set<DeleteFile> 
addDeleteFiles) {
-    RowDelta rowDelta = transaction.newRowDelta();
+    RowDelta rowDelta =
+        
transaction.newRowDelta().scanManifestsWith(IcebergThreadPools.getCommitExecutor());
     addDeleteFiles.forEach(rowDelta::addDeletes);
     rowDelta.set(SnapshotSummary.SNAPSHOT_PRODUCER, 
CommitMetaProducer.OPTIMIZE.name());
     rowDelta.commit();
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java
index 2f9c7ec30..856a89ebc 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java
@@ -20,6 +20,7 @@ package org.apache.amoro.optimizing.scan;
 
 import org.apache.amoro.iceberg.Constants;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.utils.IcebergThreadPools;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
@@ -42,7 +43,12 @@ public class IcebergTableFileScanHelper implements 
TableFileScanHelper {
       return CloseableIterable.empty();
     }
     return CloseableIterable.transform(
-        
table.newScan().useSnapshot(snapshotId).filter(partitionFilter).planFiles(),
+        table
+            .newScan()
+            .planWith(IcebergThreadPools.getPlanningExecutor())
+            .useSnapshot(snapshotId)
+            .filter(partitionFilter)
+            .planFiles(),
         this::buildFileScanResult);
   }
 
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/IcebergThreadPools.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/IcebergThreadPools.java
new file mode 100644
index 000000000..1c19968bc
--- /dev/null
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/IcebergThreadPools.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.utils;
+
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+
+public class IcebergThreadPools {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergThreadPools.class);
+  private static volatile ExecutorService planningExecutor;
+  private static volatile ExecutorService commitExecutor;
+
+  public static void init(int planningThreadPoolSize, int 
commitThreadPoolSize) {
+    if (planningExecutor == null) {
+      synchronized (IcebergThreadPools.class) {
+        if (planningExecutor == null) {
+          planningExecutor =
+              ThreadPools.newWorkerPool("iceberg-planning-pool", 
planningThreadPoolSize);
+        }
+      }
+    }
+    if (commitExecutor == null) {
+      synchronized (IcebergThreadPools.class) {
+        if (commitExecutor == null) {
+          commitExecutor = ThreadPools.newWorkerPool("iceberg-commit-pool", 
commitThreadPoolSize);
+        }
+      }
+    }
+
+    LOG.info(
+        "init iceberg thread pool success, planningExecutor 
size:{},commitExecutor size:{}",
+        planningThreadPoolSize,
+        commitThreadPoolSize);
+  }
+
+  public static ExecutorService getPlanningExecutor() {
+    if (planningExecutor == null) {
+      synchronized (IcebergThreadPools.class) {
+        if (planningExecutor == null) {
+          planningExecutor =
+              ThreadPools.newWorkerPool(
+                  "iceberg-planning-pool", 
Runtime.getRuntime().availableProcessors());
+        }
+      }
+    }
+    return planningExecutor;
+  }
+
+  public static ExecutorService getCommitExecutor() {
+    if (commitExecutor == null) {
+      synchronized (IcebergThreadPools.class) {
+        if (commitExecutor == null) {
+          commitExecutor =
+              ThreadPools.newWorkerPool(
+                  "iceberg-commit-pool", 
Runtime.getRuntime().availableProcessors());
+        }
+      }
+    }
+    return commitExecutor;
+  }
+}

Reply via email to