This is an automated email from the ASF dual-hosted git repository.

yangjiaqi pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-hugegraph-commons.git


The following commit(s) were added to refs/heads/master by this push:
     new 9baef8b  fix(commons):fixed memory leaks occur in HugeGraph Server 
during data writing (#144)
9baef8b is described below

commit 9baef8b255bbed67714215835e6f50b245ae43f6
Author: haohao0103 <[email protected]>
AuthorDate: Tue Jul 30 17:46:59 2024 +0800

    fix(commons):fixed memory leaks occur in HugeGraph Server during data 
writing (#144)
    
    * #2578
    fixed memory leaks occur in HugeGraph Server during data writing
---
 .../java/org/apache/hugegraph/event/EventHub.java  | 20 ++++++++-
 .../org/apache/hugegraph/util/ExecutorUtil.java    | 47 +++++++++++++++++++++-
 2 files changed, 64 insertions(+), 3 deletions(-)

diff --git 
a/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java 
b/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
index 5107fba..b37c671 100644
--- a/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
+++ b/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
@@ -57,16 +57,23 @@ public class EventHub {
     }
 
     public EventHub(String name) {
-        this(name, 1);
+        this(name, 1, Runtime.getRuntime().availableProcessors() << 2);
     }
 
     public EventHub(String name, int threadSize) {
-        LOG.debug("Create new EventHub {}", name);
+        LOG.debug("Create new EventHub {},threadSize {}", name, threadSize);
         this.name = name;
         this.listeners = new ConcurrentHashMap<>();
         EventHub.init(threadSize);
     }
 
+    public EventHub(String name, int corePoolSize, int maximumPoolSize) {
+        LOG.debug("Create new EventHub {},corePoolSize {}, maximumPoolSize 
{}", name, corePoolSize, maximumPoolSize);
+        this.name = name;
+        this.listeners = new ConcurrentHashMap<>();
+        EventHub.init(corePoolSize, maximumPoolSize);
+    }
+
     public static synchronized void init(int poolSize) {
         if (executor != null) {
             return;
@@ -75,6 +82,15 @@ public class EventHub {
         executor = ExecutorUtil.newFixedThreadPool(poolSize, EVENT_WORKER);
     }
 
+    public static synchronized void init(int corePoolSize, int 
maximumPoolSize) {
+        LOG.debug("Init corePoolSize {}, maximumPoolSize {} for EventHub", 
corePoolSize, maximumPoolSize);
+        if (executor != null) {
+            LOG.debug("EventHub executor already initialized");
+            return;
+        }
+        executor = ExecutorUtil.newDynamicThreadExecutor(EVENT_WORKER, 
corePoolSize, maximumPoolSize);
+    }
+
     public static synchronized boolean destroy(long timeout)
                                                throws InterruptedException {
         E.checkState(executor != null, "EventHub has not been initialized");
diff --git 
a/hugegraph-common/src/main/java/org/apache/hugegraph/util/ExecutorUtil.java 
b/hugegraph-common/src/main/java/org/apache/hugegraph/util/ExecutorUtil.java
index 9f37c83..f0ff7f1 100644
--- a/hugegraph-common/src/main/java/org/apache/hugegraph/util/ExecutorUtil.java
+++ b/hugegraph-common/src/main/java/org/apache/hugegraph/util/ExecutorUtil.java
@@ -21,13 +21,58 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-
 import org.apache.hugegraph.concurrent.PausableScheduledThreadPool;
 
+
+
 public final class ExecutorUtil {
 
+    public static ThreadPoolExecutor newDynamicThreadExecutor(String name,
+                                                              int corePoolSize,
+                                                              int 
maximumPoolSize) {
+
+        long keepAliveTime = 60L;
+        TimeUnit unit = TimeUnit.SECONDS;
+        ThreadFactory factory = new BasicThreadFactory.Builder()
+                .namingPattern(name)
+                .build();
+        CustomBlockingQueue<Runnable> workQueue = new CustomBlockingQueue<>();
+        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
+                corePoolSize,
+                maximumPoolSize,
+                keepAliveTime,
+                unit,
+                workQueue,
+                factory,
+                new ThreadPoolExecutor.CallerRunsPolicy()
+        );
+
+        workQueue.setThreadPoolExecutor(threadPoolExecutor);
+
+        return threadPoolExecutor;
+    }
+
+    static class CustomBlockingQueue<E> extends LinkedBlockingQueue<E> {
+        private ThreadPoolExecutor threadPoolExecutor;
+
+        public void setThreadPoolExecutor(ThreadPoolExecutor 
threadPoolExecutor) {
+            this.threadPoolExecutor = threadPoolExecutor;
+        }
+
+        @Override
+        public boolean offer(E e) {
+            if (threadPoolExecutor.getPoolSize() < 
threadPoolExecutor.getMaximumPoolSize()) {
+                return false;
+            }
+            return super.offer(e);
+        }
+    }
+
     public static ExecutorService newFixedThreadPool(String name) {
         return newFixedThreadPool(1, name);
     }

Reply via email to