This is an automated email from the ASF dual-hosted git repository.
yangjiaqi pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-hugegraph-commons.git
The following commit(s) were added to refs/heads/master by this push:
new 9baef8b fix(commons):fixed memory leaks occur in HugeGraph Server
during data writing (#144)
9baef8b is described below
commit 9baef8b255bbed67714215835e6f50b245ae43f6
Author: haohao0103 <[email protected]>
AuthorDate: Tue Jul 30 17:46:59 2024 +0800
fix(commons):fixed memory leaks occur in HugeGraph Server during data
writing (#144)
* #2578
fixed memory leaks occur in HugeGraph Server during data writing
---
.../java/org/apache/hugegraph/event/EventHub.java | 20 ++++++++-
.../org/apache/hugegraph/util/ExecutorUtil.java | 47 +++++++++++++++++++++-
2 files changed, 64 insertions(+), 3 deletions(-)
diff --git
a/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
b/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
index 5107fba..b37c671 100644
--- a/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
+++ b/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
@@ -57,16 +57,23 @@ public class EventHub {
}
public EventHub(String name) {
- this(name, 1);
+ this(name, 1, Runtime.getRuntime().availableProcessors() << 2);
}
public EventHub(String name, int threadSize) {
- LOG.debug("Create new EventHub {}", name);
+ LOG.debug("Create new EventHub {},threadSize {}", name, threadSize);
this.name = name;
this.listeners = new ConcurrentHashMap<>();
EventHub.init(threadSize);
}
+ public EventHub(String name, int corePoolSize, int maximumPoolSize) {
+ LOG.debug("Create new EventHub {},corePoolSize {}, maximumPoolSize
{}", name, corePoolSize, maximumPoolSize);
+ this.name = name;
+ this.listeners = new ConcurrentHashMap<>();
+ EventHub.init(corePoolSize, maximumPoolSize);
+ }
+
public static synchronized void init(int poolSize) {
if (executor != null) {
return;
@@ -75,6 +82,15 @@ public class EventHub {
executor = ExecutorUtil.newFixedThreadPool(poolSize, EVENT_WORKER);
}
+ public static synchronized void init(int corePoolSize, int
maximumPoolSize) {
+ LOG.debug("Init corePoolSize {}, maximumPoolSize {} for EventHub",
corePoolSize, maximumPoolSize);
+ if (executor != null) {
+ LOG.debug("EventHub executor already initialized");
+ return;
+ }
+ executor = ExecutorUtil.newDynamicThreadExecutor(EVENT_WORKER,
corePoolSize, maximumPoolSize);
+ }
+
public static synchronized boolean destroy(long timeout)
throws InterruptedException {
E.checkState(executor != null, "EventHub has not been initialized");
diff --git
a/hugegraph-common/src/main/java/org/apache/hugegraph/util/ExecutorUtil.java
b/hugegraph-common/src/main/java/org/apache/hugegraph/util/ExecutorUtil.java
index 9f37c83..f0ff7f1 100644
--- a/hugegraph-common/src/main/java/org/apache/hugegraph/util/ExecutorUtil.java
+++ b/hugegraph-common/src/main/java/org/apache/hugegraph/util/ExecutorUtil.java
@@ -21,13 +21,58 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-
import org.apache.hugegraph.concurrent.PausableScheduledThreadPool;
+
+
public final class ExecutorUtil {
+ public static ThreadPoolExecutor newDynamicThreadExecutor(String name,
+ int corePoolSize,
+ int
maximumPoolSize) {
+
+ long keepAliveTime = 60L;
+ TimeUnit unit = TimeUnit.SECONDS;
+ ThreadFactory factory = new BasicThreadFactory.Builder()
+ .namingPattern(name)
+ .build();
+ CustomBlockingQueue<Runnable> workQueue = new CustomBlockingQueue<>();
+ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
+ corePoolSize,
+ maximumPoolSize,
+ keepAliveTime,
+ unit,
+ workQueue,
+ factory,
+ new ThreadPoolExecutor.CallerRunsPolicy()
+ );
+
+ workQueue.setThreadPoolExecutor(threadPoolExecutor);
+
+ return threadPoolExecutor;
+ }
+
+ static class CustomBlockingQueue<E> extends LinkedBlockingQueue<E> {
+ private ThreadPoolExecutor threadPoolExecutor;
+
+ public void setThreadPoolExecutor(ThreadPoolExecutor
threadPoolExecutor) {
+ this.threadPoolExecutor = threadPoolExecutor;
+ }
+
+ @Override
+ public boolean offer(E e) {
+ if (threadPoolExecutor.getPoolSize() <
threadPoolExecutor.getMaximumPoolSize()) {
+ return false;
+ }
+ return super.offer(e);
+ }
+ }
+
public static ExecutorService newFixedThreadPool(String name) {
return newFixedThreadPool(1, name);
}