This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 7b0d8085e [AMORO-3312]Separate the shared iceberg-worker-pool for
planning and committing (#3313)
7b0d8085e is described below
commit 7b0d8085eaab8865e695d11f1da117374533825f
Author: rfyu <[email protected]>
AuthorDate: Mon Nov 18 19:12:45 2024 +0800
[AMORO-3312]Separate the shared iceberg-worker-pool for planning and
committing (#3313)
* [AMORO-3312]Separate the shared iceberg-worker-pool for planning and
committing
* fix UT Error
* Modify configuration names
---------
Co-authored-by: Xavier Bai <[email protected]>
---
.../apache/amoro/server/AmoroManagementConf.java | 16 +++++
.../apache/amoro/server/AmoroServiceContainer.java | 13 +++-
.../server/optimizing/UnKeyedTableCommit.java | 7 +-
.../scan/IcebergTableFileScanHelper.java | 8 ++-
.../org/apache/amoro/utils/IcebergThreadPools.java | 81 ++++++++++++++++++++++
5 files changed, 121 insertions(+), 4 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index f52a0113f..c21a2ca1b 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -59,6 +59,22 @@ public class AmoroManagementConf {
"Sets the size of the worker pool. The worker pool limits the
number of tasks concurrently processing "
+ "manifests in the base table implementation across all
concurrent planning or commit operations.");
+ public static final ConfigOption<Integer>
TABLE_MANIFEST_IO_PLANNING_THREAD_COUNT =
+ ConfigOptions.key("self-optimizing.plan-manifest-io-thread-count")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "Sets the size of the worker pool. The worker pool limits the
number of tasks concurrently processing "
+ + "manifests in the base table implementation across all
concurrent planning operations.");
+
+ public static final ConfigOption<Integer>
TABLE_MANIFEST_IO_COMMIT_THREAD_COUNT =
+ ConfigOptions.key("self-optimizing.commit-manifest-io-thread-count")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "Sets the size of the worker pool. The worker pool limits the
number of tasks concurrently processing "
+ + "manifests in the base table implementation across all
concurrent commit operations.");
+
public static final ConfigOption<Long> REFRESH_EXTERNAL_CATALOGS_INTERVAL =
ConfigOptions.key("refresh-external-catalogs.interval")
.longType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 3d70c0acb..400fc6235 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -59,6 +59,7 @@ import
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TNonblockingSer
import
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException;
import
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory;
import
org.apache.amoro.shade.thrift.org.apache.thrift.transport.layered.TFramedTransport;
+import org.apache.amoro.utils.IcebergThreadPools;
import org.apache.amoro.utils.JacksonUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.SystemProperties;
@@ -449,10 +450,20 @@ public class AmoroServiceContainer {
private void setIcebergSystemProperties() {
int workerThreadPoolSize =
Math.max(
- Runtime.getRuntime().availableProcessors(),
+ Runtime.getRuntime().availableProcessors() / 2,
serviceConfig.getInteger(AmoroManagementConf.TABLE_MANIFEST_IO_THREAD_COUNT));
System.setProperty(
SystemProperties.WORKER_THREAD_POOL_SIZE_PROP,
String.valueOf(workerThreadPoolSize));
+ int planningThreadPoolSize =
+ Math.max(
+ Runtime.getRuntime().availableProcessors() / 2,
+ serviceConfig.getInteger(
+
AmoroManagementConf.TABLE_MANIFEST_IO_PLANNING_THREAD_COUNT));
+ int commitThreadPoolSize =
+ Math.max(
+ Runtime.getRuntime().availableProcessors() / 2,
+
serviceConfig.getInteger(AmoroManagementConf.TABLE_MANIFEST_IO_COMMIT_THREAD_COUNT));
+ IcebergThreadPools.init(planningThreadPoolSize, commitThreadPoolSize);
}
private void initContainerConfig() {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
index 957aa3762..dbeb37a58 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
@@ -39,6 +39,7 @@ import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.UnkeyedTable;
import org.apache.amoro.utils.ContentFiles;
+import org.apache.amoro.utils.IcebergThreadPools;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.amoro.utils.TableFileUtil;
import org.apache.amoro.utils.TablePropertyUtil;
@@ -262,7 +263,8 @@ public class UnKeyedTableCommit {
return;
}
- RewriteFiles rewriteFiles = transaction.newRewrite();
+ RewriteFiles rewriteFiles =
+
transaction.newRewrite().scanManifestsWith(IcebergThreadPools.getCommitExecutor());
if (targetSnapshotId != Constants.INVALID_SNAPSHOT_ID) {
long sequenceNumber =
table.asUnkeyedTable().snapshot(targetSnapshotId).sequenceNumber();
rewriteFiles.validateFromSnapshot(targetSnapshotId).dataSequenceNumber(sequenceNumber);
@@ -282,7 +284,8 @@ public class UnKeyedTableCommit {
}
private void addDeleteFiles(Transaction transaction, Set<DeleteFile>
addDeleteFiles) {
- RowDelta rowDelta = transaction.newRowDelta();
+ RowDelta rowDelta =
+
transaction.newRowDelta().scanManifestsWith(IcebergThreadPools.getCommitExecutor());
addDeleteFiles.forEach(rowDelta::addDeletes);
rowDelta.set(SnapshotSummary.SNAPSHOT_PRODUCER,
CommitMetaProducer.OPTIMIZE.name());
rowDelta.commit();
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java
index 2f9c7ec30..856a89ebc 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java
@@ -20,6 +20,7 @@ package org.apache.amoro.optimizing.scan;
import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.utils.IcebergThreadPools;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
@@ -42,7 +43,12 @@ public class IcebergTableFileScanHelper implements
TableFileScanHelper {
return CloseableIterable.empty();
}
return CloseableIterable.transform(
-
table.newScan().useSnapshot(snapshotId).filter(partitionFilter).planFiles(),
+ table
+ .newScan()
+ .planWith(IcebergThreadPools.getPlanningExecutor())
+ .useSnapshot(snapshotId)
+ .filter(partitionFilter)
+ .planFiles(),
this::buildFileScanResult);
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/IcebergThreadPools.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/IcebergThreadPools.java
new file mode 100644
index 000000000..1c19968bc
--- /dev/null
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/IcebergThreadPools.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.utils;
+
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+
+public class IcebergThreadPools {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergThreadPools.class);
+ private static volatile ExecutorService planningExecutor;
+ private static volatile ExecutorService commitExecutor;
+
+ public static void init(int planningThreadPoolSize, int
commitThreadPoolSize) {
+ if (planningExecutor == null) {
+ synchronized (IcebergThreadPools.class) {
+ if (planningExecutor == null) {
+ planningExecutor =
+ ThreadPools.newWorkerPool("iceberg-planning-pool",
planningThreadPoolSize);
+ }
+ }
+ }
+ if (commitExecutor == null) {
+ synchronized (IcebergThreadPools.class) {
+ if (commitExecutor == null) {
+ commitExecutor = ThreadPools.newWorkerPool("iceberg-commit-pool",
commitThreadPoolSize);
+ }
+ }
+ }
+
+ LOG.info(
+ "init iceberg thread pool success, planningExecutor
size:{},commitExecutor size:{}",
+ planningThreadPoolSize,
+ commitThreadPoolSize);
+ }
+
+ public static ExecutorService getPlanningExecutor() {
+ if (planningExecutor == null) {
+ synchronized (IcebergThreadPools.class) {
+ if (planningExecutor == null) {
+ planningExecutor =
+ ThreadPools.newWorkerPool(
+ "iceberg-planning-pool",
Runtime.getRuntime().availableProcessors());
+ }
+ }
+ }
+ return planningExecutor;
+ }
+
+ public static ExecutorService getCommitExecutor() {
+ if (commitExecutor == null) {
+ synchronized (IcebergThreadPools.class) {
+ if (commitExecutor == null) {
+ commitExecutor =
+ ThreadPools.newWorkerPool(
+ "iceberg-commit-pool",
Runtime.getRuntime().availableProcessors());
+ }
+ }
+ }
+ return commitExecutor;
+ }
+}