This is an automated email from the ASF dual-hosted git repository.
ming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new c99cfcddc fix(server):fix graph server cache notifier mechanism (#2729)
c99cfcddc is described below
commit c99cfcddc9f397513a7288748471b5ed58031f20
Author: haohao0103 <[email protected]>
AuthorDate: Wed Apr 2 09:53:22 2025 +0800
fix(server):fix graph server cache notifier mechanism (#2729)
* #2728
* fix some typo & tiny improve
---------
Co-authored-by: imbajin <[email protected]>
---
.../org/apache/hugegraph/StandardHugeGraph.java | 63 +++++++++------
.../backend/cache/CachedGraphTransaction.java | 22 +++---
.../hugegraph/backend/tx/GraphTransaction.java | 92 +++++++++++-----------
3 files changed, 97 insertions(+), 80 deletions(-)
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
index eb991c0f6..ed9cd4234 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
@@ -17,6 +17,7 @@
package org.apache.hugegraph;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -577,11 +578,7 @@ public class StandardHugeGraph implements HugeGraph {
private AbstractSerializer serializer() {
String name = this.configuration.get(CoreOptions.SERIALIZER);
LOG.debug("Loading serializer '{}' for graph '{}'", name, this.name);
- AbstractSerializer serializer =
SerializerFactory.serializer(this.configuration, name);
- if (serializer == null) {
- throw new HugeException("Can't load serializer with name " + name);
- }
- return serializer;
+ return SerializerFactory.serializer(this.configuration, name);
}
private Analyzer analyzer() {
@@ -597,7 +594,7 @@ public class StandardHugeGraph implements HugeGraph {
}
protected void reloadRamtable(boolean loadFromFile) {
- // Expect triggered manually, like gremlin job
+ // Expect triggered manually, like a gremlin job
if (this.ramtable != null) {
this.ramtable.reload(loadFromFile, this.name);
} else {
@@ -1615,37 +1612,51 @@ public class StandardHugeGraph implements HugeGraph {
private static class AbstractCacheNotifier implements CacheNotifier {
+ public static final Logger LOG =
Log.logger(AbstractCacheNotifier.class);
+
private final EventHub hub;
private final EventListener cacheEventListener;
public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) {
this.hub = hub;
this.cacheEventListener = event -> {
- Object[] args = event.args();
- E.checkArgument(args.length > 0 && args[0] instanceof String,
- "Expect event action argument");
- if (Cache.ACTION_INVALIDED.equals(args[0])) {
- event.checkArgs(String.class, HugeType.class,
Object.class);
- HugeType type = (HugeType) args[1];
- Object ids = args[2];
- if (ids instanceof Id[]) {
- // argument type mismatch: proxy.invalid2(type,Id[]ids)
- proxy.invalid2(type, (Id[]) ids);
- } else if (ids instanceof Id) {
- proxy.invalid(type, (Id) ids);
- } else {
- E.checkArgument(false, "Unexpected argument: %s", ids);
+ try {
+ LOG.info("Received event: {}", event);
+ Object[] args = event.args();
+ E.checkArgument(args.length > 0 && args[0] instanceof
String,
+ "Expect event action argument");
+ String action = (String) args[0];
+ LOG.debug("Event action: {}", action);
+ if (Cache.ACTION_INVALIDED.equals(action)) {
+ event.checkArgs(String.class, HugeType.class,
Object.class);
+ HugeType type = (HugeType) args[1];
+ Object ids = args[2];
+ if (ids instanceof Id[]) {
+ LOG.debug("Calling proxy.invalid2 with type: {},
IDs: {}", type, Arrays.toString((Id[]) ids));
+ proxy.invalid2(type, (Id[]) ids);
+ } else if (ids instanceof Id) {
+ LOG.debug("Calling proxy.invalid with type: {},
ID: {}", type, ids);
+ proxy.invalid(type, (Id) ids);
+ } else {
+ LOG.error("Unexpected argument: {}", ids);
+ E.checkArgument(false, "Unexpected argument: %s",
ids);
+ }
+ return true;
+ } else if (Cache.ACTION_CLEARED.equals(action)) {
+ event.checkArgs(String.class, HugeType.class);
+ HugeType type = (HugeType) args[1];
+ LOG.debug("Calling proxy.clear with type: {}", type);
+ proxy.clear(type);
+ return true;
}
- return true;
- } else if (Cache.ACTION_CLEARED.equals(args[0])) {
- event.checkArgs(String.class, HugeType.class);
- HugeType type = (HugeType) args[1];
- proxy.clear(type);
- return true;
+ } catch (Exception e) {
+ LOG.error("Error processing cache event: {}",
e.getMessage(), e);
}
+ LOG.warn("Event {} not handled",event);
return false;
};
this.hub.listen(Events.CACHE, this.cacheEventListener);
+ LOG.info("Cache event listener registered successfully.
cacheEventListener {}",this.cacheEventListener);
}
@Override
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
index 83ab7f51a..cbf23e14d 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
@@ -133,7 +133,9 @@ public final class CachedGraphTransaction extends
GraphTransaction {
}
return false;
};
- this.store().provider().listen(this.storeEventListener);
+
if(storeEventListenStatus.putIfAbsent(this.params().name(),true)==null){
+ this.store().provider().listen(this.storeEventListener);
+ }
// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
@@ -182,19 +184,21 @@ public final class CachedGraphTransaction extends
GraphTransaction {
}
return false;
};
- EventHub graphEventHub = this.params().graphEventHub();
- if (!graphEventHub.containsListener(Events.CACHE)) {
+
if(graphCacheListenStatus.putIfAbsent(this.params().name(),true)==null){
+ EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.listen(Events.CACHE, this.cacheEventListener);
}
}
private void unlistenChanges() {
- // Unlisten store event
- this.store().provider().unlisten(this.storeEventListener);
-
- // Unlisten cache event
- EventHub graphEventHub = this.params().graphEventHub();
- graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
+ String graphName = this.params().name();
+ if (graphCacheListenStatus.remove(graphName) != null) {
+ EventHub graphEventHub = this.params().graphEventHub();
+ graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
+ }
+ if (storeEventListenStatus.remove(graphName) != null) {
+ this.store().provider().unlisten(this.storeEventListener);
+ }
}
private void notifyChanges(String action, HugeType type, Id[] ids) {
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
index 7f441574e..e50fa5c6f 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -139,6 +140,10 @@ public class GraphTransaction extends IndexableTransaction
{
private final int verticesCapacity;
private final int edgesCapacity;
+ protected static final ConcurrentHashMap<String, Boolean>
graphCacheListenStatus =
+ new ConcurrentHashMap<>();
+ protected static final ConcurrentHashMap<String, Boolean>
storeEventListenStatus =
+ new ConcurrentHashMap<>();
public GraphTransaction(HugeGraphParams graph, BackendStore store) {
super(graph, store);
@@ -400,8 +405,8 @@ public class GraphTransaction extends IndexableTransaction {
/*
* If the backend stores vertex together with edges, it's edges
* would be removed after removing vertex. Otherwise, if the
- * backend stores vertex which is separated from edges, it's
- * edges should be removed manually when removing vertex.
+ * backend stores vertex which is separated from edges,
+ * its edges should be removed manually when removing vertex.
*/
this.doRemove(this.serializer.writeVertex(v.prepareRemoved()));
this.indexTx.updateVertexIndex(v, true);
@@ -435,7 +440,7 @@ public class GraphTransaction extends IndexableTransaction {
if (this.store().features().supportsUpdateVertexProperty()) {
// Update vertex index without removed property
this.indexTx.updateVertexIndex(prop.element(), false);
- // Eliminate the property(OUT and IN owner edge)
+ // Eliminate the property (OUT and IN owner edge)
this.doEliminate(this.serializer.writeVertexProperty(prop));
} else {
// Override vertex
@@ -447,12 +452,12 @@ public class GraphTransaction extends
IndexableTransaction {
if (this.store().features().supportsUpdateEdgeProperty()) {
// Update edge index without removed property
this.indexTx.updateEdgeIndex(prop.element(), false);
- // Eliminate the property(OUT and IN owner edge)
+ // Eliminate the property (OUT and IN owner edge)
this.doEliminate(this.serializer.writeEdgeProperty(prop));
this.doEliminate(this.serializer.writeEdgeProperty(
prop.switchEdgeOwner()));
} else {
- // Override edge(it will be in addedEdges & updatedEdges)
+ // Override edge (it will be in addedEdges & updatedEdges)
this.addEdge(prop.element());
}
}
@@ -464,7 +469,7 @@ public class GraphTransaction extends IndexableTransaction {
if (this.store().features().supportsUpdateVertexProperty()) {
// Update vertex index with new added property
this.indexTx.updateVertexIndex(prop.element(), false);
- // Append new property(OUT and IN owner edge)
+ // Append new property (OUT and IN owner edge)
this.doAppend(this.serializer.writeVertexProperty(prop));
} else {
// Override vertex
@@ -474,9 +479,9 @@ public class GraphTransaction extends IndexableTransaction {
assert p.element().type().isEdge();
HugeEdgeProperty<?> prop = (HugeEdgeProperty<?>) p;
if (this.store().features().supportsUpdateEdgeProperty()) {
- // Update edge index with new added property
+ // Update edge-index with new added property
this.indexTx.updateEdgeIndex(prop.element(), false);
- // Append new property(OUT and IN owner edge)
+ // Append new property (OUT and IN owner edge)
this.doAppend(this.serializer.writeEdgeProperty(prop));
this.doAppend(this.serializer.writeEdgeProperty(
prop.switchEdgeOwner()));
@@ -560,12 +565,12 @@ public class GraphTransaction extends
IndexableTransaction {
QueryList<Number> queries = this.optimizeQueries(query, q -> {
boolean isIndexQuery = q instanceof IdQuery;
assert isIndexQuery || isConditionQuery || q == query;
- // Need to fallback if there are uncommitted records
+ // Need to fall back if there are uncommitted records
boolean fallback = hasUpdate;
Number result;
if (fallback) {
- // Here just ignore it, and do fallback later
+ // Here just ignore it, and do fall back later
result = null;
} else if (!isIndexQuery || !isConditionQuery) {
// It's a sysprop-query, let parent tx do it
@@ -578,7 +583,7 @@ public class GraphTransaction extends IndexableTransaction {
assert query instanceof ConditionQuery;
OptimizedType optimized = ((ConditionQuery) query).optimized();
if (this.optimizeAggrByIndex && optimized ==
OptimizedType.INDEX) {
- // The ids size means results count (assume no left index)
+ // The id's size means result count (assume no left index)
result = q.idsSize();
} else {
assert !fallback;
@@ -587,7 +592,7 @@ public class GraphTransaction extends IndexableTransaction {
}
}
- // Can't be optimized, then do fallback
+ // Can't be optimized, then do fall back
if (fallback) {
assert result == null;
assert q.resultType().isVertex() || q.resultType().isEdge();
@@ -629,7 +634,7 @@ public class GraphTransaction extends IndexableTransaction {
/*
* No need to lock VERTEX_LABEL_ADD_UPDATE, because vertex label
* update only can add nullable properties and user data, which is
- * unconcerned with add vertex
+ * unconcerned with added vertex
*/
this.beforeWrite();
this.addedVertices.put(vertex.id(), vertex);
@@ -762,21 +767,16 @@ public class GraphTransaction extends
IndexableTransaction {
return this.queryVerticesByIds(vertexIds, false, false,
HugeType.SERVER);
}
- return this.queryVerticesByIds(vertexIds, false, false,
- HugeType.VERTEX);
+ return this.queryVerticesByIds(vertexIds, false, false,
HugeType.VERTEX);
}
- protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds,
- boolean adjacentVertex,
- boolean checkMustExist) {
- return this.queryVerticesByIds(vertexIds, adjacentVertex,
checkMustExist,
- HugeType.VERTEX);
+ protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds, boolean
adjacentVertex,
+ boolean checkMustExist) {
+ return this.queryVerticesByIds(vertexIds, adjacentVertex,
checkMustExist, HugeType.VERTEX);
}
- protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds,
- boolean adjacentVertex,
- boolean checkMustExist,
- HugeType type) {
+ protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds, boolean
adjacentVertex,
+ boolean checkMustExist,
HugeType type) {
Query.checkForceCapacity(vertexIds.length);
// NOTE: allowed duplicated vertices if query by duplicated ids
@@ -890,7 +890,7 @@ public class GraphTransaction extends IndexableTransaction {
/*
* No need to lock EDGE_LABEL_ADD_UPDATE, because edge label
* update only can add nullable properties and user data, which is
- * unconcerned with add edge
+ * unconcerned with added edge
*/
this.beforeWrite();
this.addedEdges.put(edge.id(), edge);
@@ -1132,7 +1132,7 @@ public class GraphTransaction extends
IndexableTransaction {
E.checkState(vertex != null,
"No owner for updating property '%s'", prop.key());
- // Add property in memory for new created vertex
+ // Add property in memory for newly created vertex
if (vertex.fresh()) {
// The owner will do property update
vertex.setProperty(prop);
@@ -1177,7 +1177,7 @@ public class GraphTransaction extends
IndexableTransaction {
List<Id> primaryKeyIds = vertex.schemaLabel().primaryKeys();
E.checkArgument(!primaryKeyIds.contains(propKey.id()),
"Can't remove primary key '%s'", prop.key());
- // Remove property in memory for new created vertex
+ // Remove property in memory for newly created vertex
if (vertex.fresh()) {
// The owner will do property update
vertex.removeProperty(propKey.id());
@@ -1210,7 +1210,7 @@ public class GraphTransaction extends
IndexableTransaction {
E.checkState(edge != null,
"No owner for updating property '%s'", prop.key());
- // Add property in memory for new created edge
+ // Add property in memory for newly created edge
if (edge.fresh()) {
// The owner will do property update
edge.setProperty(prop);
@@ -1250,11 +1250,11 @@ public class GraphTransaction extends
IndexableTransaction {
if (!edge.hasProperty(propKey.id())) {
return;
}
- // Check is removing sort key
+ // Check is removing a sort key
List<Id> sortKeyIds = edge.schemaLabel().sortKeys();
E.checkArgument(!sortKeyIds.contains(prop.propertyKey().id()),
"Can't remove sort key '%s'", prop.key());
- // Remove property in memory for new created edge
+ // Remove property in memory for newly created edge
if (edge.fresh()) {
// The owner will do property update
edge.removeProperty(propKey.id());
@@ -1280,7 +1280,7 @@ public class GraphTransaction extends
IndexableTransaction {
}
/**
- * Construct one edge condition query based on source vertex, direction and
+ * Construct one-edge condition query based on source vertex, direction and
* edge labels
*
* @param sourceVertex source vertex of edge
@@ -1340,8 +1340,8 @@ public class GraphTransaction extends
IndexableTransaction {
}
private static ConditionQuery constructEdgesQuery(Id sourceVertex,
- Directions direction,
- List<Id> edgeLabels) {
+ Directions direction,
+ List<Id> edgeLabels) {
E.checkState(sourceVertex != null,
"The edge query must contain source vertex");
E.checkState(direction != null,
@@ -1454,7 +1454,7 @@ public class GraphTransaction extends
IndexableTransaction {
/*
* Supported query:
* 1.query just by edge label
- * 2.query just by PROPERTIES (like containsKey,containsValue)
+ * 2.query just by PROPERTIES (like containsKey, containsValue)
* 3.query with scan
*/
if (query.containsCondition(HugeKeys.LABEL) ||
@@ -1570,8 +1570,8 @@ public class GraphTransaction extends
IndexableTransaction {
}
if (vertexIdList.size() != filterVertexList.size()) {
- // Modify on the copied relation to avoid affecting other
query
- Condition.Relation relation =
+ // Modify on the copied relation to avoid affecting
another query
+ Condition.Relation relation =
query.copyRelationAndUpdateQuery(HugeKeys.OWNER_VERTEX);
relation.value(filterVertexList);
}
@@ -1603,7 +1603,8 @@ public class GraphTransaction extends
IndexableTransaction {
*/
query.resetUserpropConditions();
- if (this.storeFeatures().supportsFatherAndSubEdgeLabel() &&
query.condition(HugeKeys.SUB_LABEL) == null) {
+ if (this.storeFeatures().supportsFatherAndSubEdgeLabel() &&
+ query.condition(HugeKeys.SUB_LABEL) == null) {
query.eq(HugeKeys.SUB_LABEL, el.id());
}
LOG.debug("Query edges by sortKeys: {}", query);
@@ -1613,7 +1614,7 @@ public class GraphTransaction extends
IndexableTransaction {
/*
* Query only by sysprops, like: by vertex label, by edge label.
- * NOTE: we assume sysprops would be indexed by backend store
+ * NOTE: we assume sysprops would be indexed by backend store,
* but we don't support query edges only by direction/target-vertex.
*/
if (query.allSysprop()) {
@@ -1842,7 +1843,7 @@ public class GraphTransaction extends
IndexableTransaction {
}
/*
* No need to lock INDEX_LABEL_ADD_UPDATE, because index label
- * update only can add user data, which is unconcerned with
+ * update only can add user data, which is unconcerned with
* update property
*/
this.beforeWrite();
@@ -1919,7 +1920,8 @@ public class GraphTransaction extends
IndexableTransaction {
}
if (cq.optimized() == OptimizedType.INDEX) {
// g.E().hasLabel(xxx).has(yyy)
- // consider OptimizedType.INDEX_FILTER occurred in
org.apache.hugegraph.core.EdgeCoreTest.testQueryCount
+ // consider OptimizedType.INDEX_FILTER occurred in
org.apache.hugegraph.core
+ // .EdgeCoreTest.testQueryCount
try {
this.indexTx.asyncRemoveIndexLeft(cq, elem);
} catch (Throwable e) {
@@ -1934,7 +1936,7 @@ public class GraphTransaction extends
IndexableTransaction {
if (cq.existLeftIndex(elem.id())) {
/*
* Both have correct and left index, wo should return true
- * but also needs to cleaned up left index
+ * but also needs to clean up left index
*/
try {
this.indexTx.asyncRemoveIndexLeft(cq, elem);
@@ -2067,8 +2069,8 @@ public class GraphTransaction extends
IndexableTransaction {
Set<V> txResults = InsertionOrderUtil.newSet();
/*
- * Collect added/updated records
- * Records in memory have higher priority than query from backend store
+ * Collect added/updated records.
+ * Records in memory have higher priority than a query from backend
store
*/
for (V elem : addedTxRecords.values()) {
if (query.reachLimit(txResults.size())) {
@@ -2275,7 +2277,7 @@ public class GraphTransaction extends
IndexableTransaction {
while (iter.hasNext()) {
consumer.accept(iter.next());
/*
- * Commit per batch to avoid too much data in single
commit,
+ * Commit per batch to avoid too much data in a single
commit,
* especially for Cassandra backend
*/
this.commitIfGtSize(GraphTransaction.COMMIT_BATCH);
@@ -2300,7 +2302,7 @@ public class GraphTransaction extends
IndexableTransaction {
if (label.equals(elemLabel)) {
consumer.accept(e);
/*
- * Commit per batch to avoid too much data in
single
+ * Commit per batch to avoid too much data in a
single
* commit, especially for Cassandra backend
*/
this.commitIfGtSize(GraphTransaction.COMMIT_BATCH);