This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4c6458aa77 [enhancement](nereids) Execute sync analyze task with
multi-thread (#22211)
4c6458aa77 is described below
commit 4c6458aa778a94d27d15ee579803c9aec584fa31
Author: AKIRA <[email protected]>
AuthorDate: Mon Jul 31 15:05:07 2023 +0800
[enhancement](nereids) Execute sync analyze task with multi-thread (#22211)
It was executed in sequentialy, which may cause a lot of time
---
.../apache/doris/statistics/AnalysisManager.java | 64 +++++++++++++++-------
1 file changed, 45 insertions(+), 19 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index cb6a3dfe5c..66f0b94aa8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.Util;
@@ -59,6 +60,7 @@ import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
@@ -82,6 +84,9 @@ import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -738,12 +743,23 @@ public class AnalysisManager extends Daemon implements
Writable {
ConnectContext ctx = ConnectContext.get();
try {
ctxToSyncTask.put(ctx, syncTaskCollection);
- syncTaskCollection.execute();
+ ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze();
+ syncTaskCollection.execute(syncExecPool);
} finally {
ctxToSyncTask.remove(ctx);
}
}
+ private ThreadPoolExecutor createThreadPoolForSyncAnalyze() {
+ String poolName = "SYNC ANALYZE THREAD POOL";
+ return new ThreadPoolExecutor(0, 64,
+ 0, TimeUnit.SECONDS,
+ new SynchronousQueue(),
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SYNC
ANALYZE" + "-%d")
+ .build(), new BlockedPolicy(poolName,
+ (int)
TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours)));
+ }
+
public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
if (dropStatsStmt.dropExpired) {
Env.getCurrentEnv().getStatisticsCleaner().clear();
@@ -844,28 +860,38 @@ public class AnalysisManager extends Daemon implements
Writable {
tasks.forEach(BaseAnalysisTask::cancel);
}
- public void execute() {
- List<String> colNames = new ArrayList<>();
- List<String> errorMessages = new ArrayList<>();
+ public void execute(ThreadPoolExecutor executor) {
+ List<String> colNames = Collections.synchronizedList(new
ArrayList<>());
+ List<String> errorMessages = Collections.synchronizedList(new
ArrayList<>());
+ CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
for (BaseAnalysisTask task : tasks) {
- if (cancelled) {
- colNames.add(task.info.colName);
- errorMessages.add("Cancelled");
- continue;
- }
- try {
- task.doExecute();
- updateSyncTaskStatus(task, AnalysisState.FINISHED);
- } catch (Throwable t) {
- colNames.add(task.info.colName);
- errorMessages.add(Util.getRootCauseMessage(t));
- updateSyncTaskStatus(task, AnalysisState.FAILED);
- LOG.warn("Failed to analyze, info: {}", task, t);
- }
+ executor.submit(() -> {
+ try {
+ if (cancelled) {
+ return;
+ }
+ try {
+ task.doExecute();
+ updateSyncTaskStatus(task, AnalysisState.FINISHED);
+ } catch (Throwable t) {
+ colNames.add(task.info.colName);
+ errorMessages.add(Util.getRootCauseMessage(t));
+ updateSyncTaskStatus(task, AnalysisState.FAILED);
+ LOG.warn("Failed to analyze, info: {}", task, t);
+ }
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
+ }
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException t) {
+ LOG.warn("Thread got interrupted when waiting sync analyze
task execution finished", t);
}
if (!colNames.isEmpty()) {
throw new RuntimeException("Failed to analyze following
columns:[" + String.join(",", colNames)
- + "] Reasons: " + String.join(",", errorMessages));
+ + "] Reasons: " + String.join(",", errorMessages));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]