This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new e66acccfd fix(store): improve some potential lock & type cast issues
(#2895)
e66acccfd is described below
commit e66acccfda1a47a6a5cc5bb1f3ad9762c88f7069
Author: Soyan <[email protected]>
AuthorDate: Sat Nov 1 04:08:14 2025 +0800
fix(store): improve some potential lock & type cast issues (#2895)
* update(store): fix some problem and clean up code
- chore(store): clean some comments
- chore(store): using Slf4j instead of System.out to print log
- update(store): update more reasonable timeout setting
- update(store): add close method for CopyOnWriteCache to avoid potential
memory leak
- update(store): delete duplicated beginTx() statement
- update(store): extract parameter for compaction thread pool(move to
configuration file in the future)
- update(store): add default logic in AggregationFunctions
- update(store): fix potential concurrency problem in QueryExecutor
* Update
hugegraph-store/hg-store-common/src/main/java/org/apache/hugegraph/store/query/func/AggregationFunctions.java
---------
Co-authored-by: Peng Junzhi <[email protected]>
---
.../org/apache/hugegraph/pd/client/PDConfig.java | 2 +-
.../org/apache/hugegraph/store/cli/cmd/Load.java | 13 +++-------
.../apache/hugegraph/store/cli/cmd/MultiQuery.java | 2 +-
.../apache/hugegraph/store/cli/cmd/ScanTable.java | 2 +-
.../store/client/query/QueryExecutor.java | 30 +++++++++++++++-------
.../store/query/func/AggregationFunctions.java | 7 +++--
.../store/business/BusinessHandlerImpl.java | 12 +++++++--
.../store/cmd/request/DestroyRaftRequest.java | 2 +-
.../hugegraph/store/util/CopyOnWriteCache.java | 24 ++++++++++++++++-
.../org/apache/hugegraph/store/node/AppConfig.java | 4 +--
.../node/grpc/query/AggregativeQueryObserver.java | 6 +----
11 files changed, 70 insertions(+), 34 deletions(-)
diff --git
a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java
b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java
index a14c32425..5555bae30 100644
---
a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java
+++
b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java
@@ -103,7 +103,7 @@ public final class PDConfig {
public PDConfig setAuthority(String userName, String pwd) {
this.userName = userName;
String auth = userName + ':' + pwd;
- this.authority = new
String(Base64.getEncoder().encode(auth.getBytes(UTF_8)));
+ this.authority =
Base64.getEncoder().encodeToString(auth.getBytes(UTF_8));
return this;
}
diff --git
a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/Load.java
b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/Load.java
index 0fbe10d01..a134d689c 100644
---
a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/Load.java
+++
b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/Load.java
@@ -121,10 +121,9 @@ public class Load extends Command {
for (int i = 0; i < readerSize; i++) {
int fi = i;
new Thread(() -> {
- try {
- InputStreamReader isr = new InputStreamReader(new
FileInputStream(split[fi]),
-
StandardCharsets.UTF_8);
- BufferedReader reader = new BufferedReader(isr);
+ try(InputStreamReader isr = new InputStreamReader(new
FileInputStream(split[fi]),
+
StandardCharsets.UTF_8);
+ BufferedReader reader = new BufferedReader(isr)) {
long count = 0;
String line;
try {
@@ -146,9 +145,6 @@ public class Load extends Command {
}
} catch (Exception e) {
throw new RuntimeException(e);
- } finally {
- isr.close();
- reader.close();
}
} catch (Exception e) {
log.error("send data with error:", e);
@@ -158,13 +154,12 @@ public class Load extends Command {
}).start();
}
latch.await();
- loadThread.join();
completed.set(true);
+ loadThread.join();
}
public boolean put(String table, List<String> keys) {
HgStoreSession session = storeClient.openSession(graph);
- session.beginTx();
try {
session.beginTx();
for (String key : keys) {
diff --git
a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/MultiQuery.java
b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/MultiQuery.java
index 6bcc4e3d9..2128e7fe0 100644
---
a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/MultiQuery.java
+++
b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/MultiQuery.java
@@ -101,7 +101,7 @@ public class MultiQuery extends Command {
current = (HgOwnerKey)
queue[finalI].poll(1,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
- //
+ Thread.currentThread().interrupt();
}
}
if (current == null) {
diff --git
a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/ScanTable.java
b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/ScanTable.java
index e46e59795..e8ebda772 100644
---
a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/ScanTable.java
+++
b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/ScanTable.java
@@ -73,7 +73,7 @@ public class ScanTable extends Command {
if (iterator.hasNext()) {
iterator.next();
position = iterator.position();
- System.out.println("count is " + count);
+ log.info("count is {}", count);
} else {
position = null;
}
diff --git
a/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/query/QueryExecutor.java
b/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/query/QueryExecutor.java
index e5e667273..b4102bb37 100644
---
a/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/query/QueryExecutor.java
+++
b/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/query/QueryExecutor.java
@@ -73,12 +73,15 @@ public class QueryExecutor {
private final HugeGraphSupplier supplier;
- private long timeout = 1800_000;
+ /**
+ * Timeout duration for StreamObserver receiving response
+ */
+ private long timeout = 60_000;
/**
* Used for testing single machine
*/
- public static String filterStore = null;
+ private static final ThreadLocal<String> filterStore = new ThreadLocal<>();
public QueryExecutor(HgStoreNodePartitioner nodePartitioner,
HugeGraphSupplier supplier,
Long timeout) {
@@ -123,12 +126,20 @@ public class QueryExecutor {
if (o1 == null && o2 == null) {
return 0;
}
-
- if (o1 != null) {
- return ((KvElement) o1).compareTo((KvElement) o2);
+ if (o1 != null && o2 != null) {
+ if (o1 instanceof KvElement && o2 instanceof KvElement) {
+ return ((KvElement) o1).compareTo((KvElement) o2);
+ }
+ if (!(o1 instanceof KvElement)) {
+ throw new IllegalStateException(
+ "Expected KvElement but got: " +
o1.getClass().getName());
+ }
+ // !(o2 instanceof KvElement)
+ throw new IllegalStateException(
+ "Expected KvElement but got: " +
o2.getClass().getName());
}
- return 0;
+ return o1 != null ? 1 : -1;
});
iterator = new StreamFinalAggregationIterator<>(iterator,
query.getFuncList());
@@ -277,9 +288,10 @@ public class QueryExecutor {
}
}
- if (filterStore != null) {
- return tasks.containsKey(filterStore) ?
- List.of(Tuple2.of(filterStore, tasks.get(filterStore))) :
List.of();
+ if (filterStore.get() != null) {
+ String filterStoreStr = filterStore.get();
+ return tasks.containsKey(filterStoreStr) ?
+ List.of(Tuple2.of(filterStoreStr,
tasks.get(filterStoreStr))) : List.of();
}
return tasks.entrySet().stream()
diff --git
a/hugegraph-store/hg-store-common/src/main/java/org/apache/hugegraph/store/query/func/AggregationFunctions.java
b/hugegraph-store/hg-store-common/src/main/java/org/apache/hugegraph/store/query/func/AggregationFunctions.java
index ee84f8789..23157b1e4 100644
---
a/hugegraph-store/hg-store-common/src/main/java/org/apache/hugegraph/store/query/func/AggregationFunctions.java
+++
b/hugegraph-store/hg-store-common/src/main/java/org/apache/hugegraph/store/query/func/AggregationFunctions.java
@@ -79,8 +79,11 @@ public class AggregationFunctions {
((AtomicFloat) buffer).getAndAdd((Float) record);
break;
default:
- // throw new Exception ?
- break;
+ // throw new Exception
+ throw new IllegalStateException(
+ "Unsupported buffer type: " +
buffer.getClass().getName() +
+ ". Supported types: AtomicLong, AtomicInteger,
AtomicDouble, AtomicFloat"
+ );
}
}
}
diff --git
a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java
b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java
index 307e5fc57..9287bfe26 100644
---
a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java
+++
b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java
@@ -130,13 +130,17 @@ public class BusinessHandlerImpl implements
BusinessHandler {
}};
private static final Map<Integer, String> dbNames = new
ConcurrentHashMap<>();
private static HugeGraphSupplier mockGraphSupplier = null;
- private static final int compactionThreadCount = 64;
private static final ConcurrentMap<String, AtomicInteger> pathLock = new
ConcurrentHashMap<>();
private static final ConcurrentMap<Integer, AtomicInteger> compactionState
=
new ConcurrentHashMap<>();
+ // Default core thread count
+ private static final int compactionThreadCount = 64;
+ private static final int compactionMaxThreadCount = 256;
+ // Max size of compaction queue
+ private static final int compactionQueueSize = 1000;
private static final ThreadPoolExecutor compactionPool =
ExecutorUtil.createExecutor(PoolNames.COMPACT,
compactionThreadCount,
- compactionThreadCount * 4,
Integer.MAX_VALUE);
+ compactionMaxThreadCount,
compactionQueueSize);
private static final int timeoutMillis = 6 * 3600 * 1000;
private final BinaryElementSerializer serializer =
BinaryElementSerializer.getInstance();
private final DirectBinarySerializer directBinarySerializer = new
DirectBinarySerializer();
@@ -1667,4 +1671,8 @@ public class BusinessHandlerImpl implements
BusinessHandler {
};
}
}
+
+ public static void clearCache() {
+ GRAPH_SUPPLIER_CACHE.clear();
+ }
}
diff --git
a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/cmd/request/DestroyRaftRequest.java
b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/cmd/request/DestroyRaftRequest.java
index ecd7e7cf0..b9e61837d 100644
---
a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/cmd/request/DestroyRaftRequest.java
+++
b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/cmd/request/DestroyRaftRequest.java
@@ -27,7 +27,7 @@ import lombok.Data;
@Data
public class DestroyRaftRequest extends HgCmdBase.BaseRequest {
- private List<String> graphNames = new ArrayList<>();
+ private final List<String> graphNames = new ArrayList<>();
public void addGraphName(String graphName) {
graphNames.add(graphName);
diff --git
a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/util/CopyOnWriteCache.java
b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/util/CopyOnWriteCache.java
index f07a5a018..b20eac39f 100644
---
a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/util/CopyOnWriteCache.java
+++
b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/util/CopyOnWriteCache.java
@@ -20,6 +20,7 @@ package org.apache.hugegraph.store.util;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@@ -29,7 +30,9 @@ import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.NotNull;
-//FIXME Missing shutdown method
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class CopyOnWriteCache<K, V> implements ConcurrentMap<K, V> {
// Scheduled executor service for periodically clearing the cache.
@@ -263,4 +266,23 @@ public class CopyOnWriteCache<K, V> implements
ConcurrentMap<K, V> {
return null;
}
}
+
+ public void close(){
+ scheduledExecutor.shutdown();
+ try {
+ boolean isTerminated = scheduledExecutor.awaitTermination(30,
TimeUnit.SECONDS);
+ if (!isTerminated) {
+ List<Runnable> runnables = scheduledExecutor.shutdownNow();
+ log.info("CopyOnWriteCache shutting down with {} tasks left",
runnables.size());
+
+ boolean isNowTerminated =
scheduledExecutor.awaitTermination(30, TimeUnit.SECONDS);
+ if (!isNowTerminated) {
+ log.warn("Failed to shutdown CopyOnWriteCache thread
pool");
+ }
+ }
+ }catch (InterruptedException e) {
+ scheduledExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
}
diff --git
a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/AppConfig.java
b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/AppConfig.java
index a8a122327..3f1624c08 100644
---
a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/AppConfig.java
+++
b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/AppConfig.java
@@ -278,9 +278,9 @@ public class AppConfig {
private int fetchBatchSize;
/**
- * the timeout of request fetch
+ * the timeout of request fetch (ms)
*/
- @Value("${query.push-down.fetch_timeout:3600000}")
+ @Value("${query.push-down.fetch_timeout:300000}")
private long fetchTimeOut;
/**
diff --git
a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/query/AggregativeQueryObserver.java
b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/query/AggregativeQueryObserver.java
index 199d3ba55..0ba569cb9 100644
---
a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/query/AggregativeQueryObserver.java
+++
b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/query/AggregativeQueryObserver.java
@@ -60,10 +60,6 @@ public class AggregativeQueryObserver implements
StreamObserver<QueryRequest> {
private final AtomicInteger consumeCount = new AtomicInteger(0);
private final AtomicInteger sendCount = new AtomicInteger(0);
private final AtomicBoolean clientCanceled = new AtomicBoolean(false);
- // private final ThreadLocal<QueryResponse.Builder> localBuilder =
ThreadLocal.withInitial
- // (QueryResponse::newBuilder);
-// private final ThreadLocal<Kv.Builder> localKvBuilder =
ThreadLocal.withInitial
-// (Kv::newBuilder);
private final BinaryElementSerializer serializer =
BinaryElementSerializer.getInstance();
private final StreamObserver<QueryResponse> sender;
private volatile ScanIterator iterator = null;
@@ -328,7 +324,7 @@ public class AggregativeQueryObserver implements
StreamObserver<QueryRequest> {
try {
recordCount++;
executePipeline(itr.next());
- if (System.currentTimeMillis() - current > timeout * 1000) {
+ if (System.nanoTime() - current > timeout * 1_000_000) {
throw new RuntimeException("execution timeout");
}
} catch (EarlyStopException ignore) {