This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new eb2cbae6e3b [Fix](MySqlLoad) Fix meaningless thread creation every 
time checkpoint mysql load (#26031)
eb2cbae6e3b is described below

commit eb2cbae6e3b43c94e44791f3ae5e4d69e8202dba
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Oct 30 13:55:40 2023 +0800

    [Fix](MySqlLoad) Fix meaningless thread creation every time checkpoint 
mysql load (#26031)
    
    Add a unified thread name setting method
---
 .../main/java/org/apache/doris/catalog/Env.java    |  2 +
 .../apache/doris/common/CustomThreadFactory.java   | 46 ++++++++++++++++++++++
 .../org/apache/doris/load/loadv2/LoadManager.java  |  5 +++
 .../apache/doris/load/loadv2/MysqlLoadManager.java | 17 +++++---
 .../org/apache/doris/load/loadv2/TokenManager.java | 18 +++++----
 .../java/org/apache/doris/mtmv/MTMVJobManager.java | 13 ++++--
 .../org/apache/doris/mtmv/MTMVTaskManager.java     |  6 ++-
 .../doris/scheduler/disruptor/TaskDisruptor.java   |  4 +-
 .../doris/scheduler/manager/TimerJobManager.java   |  4 +-
 9 files changed, 94 insertions(+), 21 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 26b9bbd2e12..caa7d50d917 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1594,6 +1594,8 @@ public class Env {
 
     // start threads that should running on all FE
     private void startNonMasterDaemonThreads() {
+        // start load manager thread
+        loadManager.start();
         tabletStatMgr.start();
         // load and export job label cleaner thread
         labelCleaner.start();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java
new file mode 100644
index 00000000000..153131ec251
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CustomThreadFactory implements ThreadFactory {
+    private final AtomicInteger poolNumber = new AtomicInteger(1);
+    private final ThreadGroup group;
+    private final AtomicInteger threadNumber = new AtomicInteger(1);
+    private final String namePrefix;
+
+    public CustomThreadFactory(String name) {
+        SecurityManager s = System.getSecurityManager();
+        group = (s != null) ? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
+        namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(group, r, namePrefix + 
threadNumber.getAndIncrement(), 0);
+        if (t.isDaemon()) {
+            t.setDaemon(false);
+        }
+        if (t.getPriority() != Thread.NORM_PRIORITY) {
+            t.setPriority(Thread.NORM_PRIORITY);
+        }
+        return t;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 73d4d1a57ad..3f05e2c5a98 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -104,6 +104,11 @@ public class LoadManager implements Writable {
         this.mysqlLoadManager = new MysqlLoadManager(tokenManager);
     }
 
+    public void start() {
+        tokenManager.start();
+        mysqlLoadManager.start();
+    }
+
     /**
      * This method will be invoked by the broker load(v2) now.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index c25699cc63a..ee71b28385d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
@@ -72,7 +73,7 @@ import java.util.concurrent.TimeUnit;
 public class MysqlLoadManager {
     private static final Logger LOG = 
LogManager.getLogger(MysqlLoadManager.class);
 
-    private final ThreadPoolExecutor mysqlLoadPool;
+    private  ThreadPoolExecutor mysqlLoadPool;
     private final TokenManager tokenManager;
 
     private static class MySqlLoadContext {
@@ -137,14 +138,20 @@ public class MysqlLoadManager {
     }
 
     private final Map<String, MySqlLoadContext> loadContextMap = new 
ConcurrentHashMap<>();
-    private final EvictingQueue<MySqlLoadFailRecord> failedRecords;
-    private ScheduledExecutorService periodScheduler = 
Executors.newScheduledThreadPool(1);
+    private  EvictingQueue<MySqlLoadFailRecord> failedRecords;
+    private ScheduledExecutorService periodScheduler;
 
     public MysqlLoadManager(TokenManager tokenManager) {
+        this.tokenManager = tokenManager;
+    }
+
+    public void start() {
+        this.periodScheduler = Executors.newScheduledThreadPool(1,
+                new CustomThreadFactory("mysql-load-fail-record-cleaner"));
         int poolSize = Config.mysql_load_thread_pool;
         // MySqlLoad pool can accept 4 + 4 * 5 = 24  requests by default.
-        this.mysqlLoadPool = 
ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, "Mysql 
Load", true);
-        this.tokenManager = tokenManager;
+        this.mysqlLoadPool = 
ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5,
+                "Mysql Load", true);
         this.failedRecords = 
EvictingQueue.create(Config.mysql_load_in_memory_record);
         this.periodScheduler.scheduleAtFixedRate(this::cleanFailedRecords, 1, 
24, TimeUnit.HOURS);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
index f4cf4518212..80f6c3f9b50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.load.loadv2;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.thrift.FrontendService;
@@ -41,18 +42,21 @@ import java.util.concurrent.TimeUnit;
 public class TokenManager {
     private static final Logger LOG = LogManager.getLogger(TokenManager.class);
 
-    private final int thriftTimeoutMs = 300 * 1000;
-    private final EvictingQueue<String> tokenQueue;
-    private final ScheduledExecutorService tokenGenerator;
+    private  int thriftTimeoutMs = 300 * 1000;
+    private  EvictingQueue<String> tokenQueue;
+    private  ScheduledExecutorService tokenGenerator;
 
     public TokenManager() {
+    }
+
+    public void start() {
         this.tokenQueue = EvictingQueue.create(Config.token_queue_size);
         // init one token to avoid async issue.
         this.tokenQueue.offer(generateNewToken());
-        this.tokenGenerator = Executors.newScheduledThreadPool(1);
-        this.tokenGenerator.scheduleAtFixedRate(() -> {
-            tokenQueue.offer(generateNewToken());
-        }, 0, Config.token_generate_period_hour, TimeUnit.HOURS);
+        this.tokenGenerator = Executors.newScheduledThreadPool(1,
+                new CustomThreadFactory("token-generator"));
+        this.tokenGenerator.scheduleAtFixedRate(() -> 
tokenQueue.offer(generateNewToken()), 0,
+                Config.token_generate_period_hour, TimeUnit.HOURS);
     }
 
     private String generateNewToken() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index 7295f40b602..b58f26b863a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedView;
 import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
@@ -66,9 +67,11 @@ public class MTMVJobManager {
 
     private final MTMVTaskManager taskManager;
 
-    private ScheduledExecutorService periodScheduler = 
Executors.newScheduledThreadPool(1);
+    private ScheduledExecutorService periodScheduler = 
Executors.newScheduledThreadPool(1,
+            new CustomThreadFactory("mtmv-job-period-scheduler"));
 
-    private ScheduledExecutorService cleanerScheduler = 
Executors.newScheduledThreadPool(1);
+    private ScheduledExecutorService cleanerScheduler = 
Executors.newScheduledThreadPool(1,
+            new CustomThreadFactory("mtmv-job-cleaner-scheduler"));
 
     private final ReentrantReadWriteLock rwLock;
 
@@ -86,13 +89,15 @@ public class MTMVJobManager {
             // check the scheduler before using it
             // since it may be shutdown when master change to follower without 
process shutdown.
             if (periodScheduler.isShutdown()) {
-                periodScheduler = Executors.newScheduledThreadPool(1);
+                periodScheduler = Executors.newScheduledThreadPool(1,
+                        new CustomThreadFactory("mtmv-job-period-scheduler"));
             }
 
             registerJobs();
 
             if (cleanerScheduler.isShutdown()) {
-                cleanerScheduler = Executors.newScheduledThreadPool(1);
+                cleanerScheduler = Executors.newScheduledThreadPool(1,
+                        new CustomThreadFactory("mtmv-job-cleaner-scheduler"));
             }
             cleanerScheduler.scheduleAtFixedRate(() -> {
                 if (!Env.getCurrentEnv().isMaster()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index d6e370480bb..138ede9e075 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.mtmv;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.mtmv.MTMVUtils.TaskState;
 import org.apache.doris.mtmv.metadata.MTMVJob;
 import org.apache.doris.mtmv.metadata.MTMVTask;
@@ -65,13 +66,14 @@ public class MTMVTaskManager {
     // keep track of all the completed tasks
     private final Deque<MTMVTask> historyTasks = 
Queues.newLinkedBlockingDeque();
 
-    private ScheduledExecutorService taskScheduler = 
Executors.newScheduledThreadPool(1);
+    private ScheduledExecutorService taskScheduler = 
Executors.newScheduledThreadPool(1,
+            new CustomThreadFactory("mtmv-task-scheduler"));
 
     private final AtomicInteger failedTaskCount = new AtomicInteger(0);
 
     public void startTaskScheduler() {
         if (taskScheduler.isShutdown()) {
-            taskScheduler = Executors.newScheduledThreadPool(1);
+            taskScheduler = Executors.newScheduledThreadPool(1, new 
CustomThreadFactory("mtmv-task-scheduler"));
         }
         taskScheduler.scheduleAtFixedRate(() -> {
             checkRunningTask();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
index dcf346e3ebf..a8d98831f21 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
@@ -18,6 +18,7 @@
 package org.apache.doris.scheduler.disruptor;
 
 import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.scheduler.constants.TaskType;
 import org.apache.doris.scheduler.manager.TimerJobManager;
 import org.apache.doris.scheduler.manager.TransientTaskManager;
@@ -28,7 +29,6 @@ import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.WorkHandler;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
-import com.lmax.disruptor.util.DaemonThreadFactory;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.Closeable;
@@ -75,7 +75,7 @@ public class TaskDisruptor implements Closeable {
             };
 
     public TaskDisruptor(TimerJobManager timerJobManager, TransientTaskManager 
transientTaskManager) {
-        ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE;
+        ThreadFactory producerThreadFactory = new 
CustomThreadFactory("task-disruptor-producer");
         disruptor = new Disruptor<>(TaskEvent.FACTORY, 
DEFAULT_RING_BUFFER_SIZE, producerThreadFactory,
                 ProducerType.SINGLE, new BlockingWaitStrategy());
         WorkHandler<TaskEvent>[] workers = new 
TaskHandler[consumerThreadCount];
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
index 9c53d96443d..c7a728cf049 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
@@ -19,6 +19,7 @@ package org.apache.doris.scheduler.manager;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.PatternMatcher;
 import org.apache.doris.common.io.Writable;
@@ -87,7 +88,8 @@ public class TimerJobManager implements Closeable, Writable {
     }
 
     public void start() {
-        dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660);
+        dorisTimer = new HashedWheelTimer(new 
CustomThreadFactory("hashed-wheel-timer"),
+                1, TimeUnit.SECONDS, 660);
         dorisTimer.start();
         Long currentTimeMs = System.currentTimeMillis();
         jobMap.forEach((jobId, job) -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to